Repository: orc
Updated Branches:
  refs/heads/master 03b9ce8de -> ebf89f571


ORC-224: Implement column writers of primitive types

Fixes #149

Signed-off-by: Deepak Majeti <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ebf89f57
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ebf89f57
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ebf89f57

Branch: refs/heads/master
Commit: ebf89f571110d4ab47ee4a99bbf15d3a4262c113
Parents: 03b9ce8
Author: Gang Wu <[email protected]>
Authored: Tue Aug 8 15:07:06 2017 -0700
Committer: Deepak Majeti <[email protected]>
Committed: Tue Nov 7 11:23:46 2017 -0500

----------------------------------------------------------------------
 c++/include/orc/Int128.hh |   17 +
 c++/include/orc/Vector.hh |    2 +
 c++/src/ColumnWriter.cc   | 1246 ++++++++++++++++++++++++++++++++++++++--
 c++/src/ColumnWriter.hh   |    2 +-
 c++/src/Exceptions.cc     |   19 +
 c++/src/Exceptions.hh     |   10 +
 c++/src/Writer.cc         |    9 +-
 c++/test/TestWriter.cc    |  611 +++++++++++++++++++-
 8 files changed, 1853 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/include/orc/Int128.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Int128.hh b/c++/include/orc/Int128.hh
index 14215b4..160f1b0 100644
--- a/c++/include/orc/Int128.hh
+++ b/c++/include/orc/Int128.hh
@@ -93,6 +93,12 @@ namespace orc {
       return *this;
     }
 
+    Int128 abs() const {
+      Int128 value = *this;
+      value.abs();
+      return value;
+    }
+
     Int128& invert() {
       lowbits = ~lowbits;
       highbits = ~highbits;
@@ -173,6 +179,17 @@ namespace orc {
     }
 
     /**
+     * Logical and between two Int128.
+     * @param right the number to and in
+     * @return logical and result
+     */
+    Int128 operator&(const Int128 &right) {
+      Int128 value = *this;
+      value &= right;
+      return value;
+    }
+
+    /**
      * Shift left by the given number of bits.
      * Values larger than 2**127 will shift into the sign bit.
      */

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/include/orc/Vector.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index f3f1343..65101db 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -216,6 +216,7 @@ namespace orc {
      */
     DataBuffer<int64_t> readScales;
     friend class Decimal64ColumnReader;
+    friend class Decimal64ColumnWriter;
   };
 
   struct Decimal128VectorBatch: public ColumnVectorBatch {
@@ -241,6 +242,7 @@ namespace orc {
     DataBuffer<int64_t> readScales;
     friend class Decimal128ColumnReader;
     friend class DecimalHive11ColumnReader;
+    friend class Decimal128ColumnWriter;
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/ColumnWriter.cc
----------------------------------------------------------------------
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index ad18b0c..60878c3 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -179,14 +179,16 @@ namespace orc {
     notNullEncoder->recordPosition(rowIndexPosition.get());
   }
 
-  void ColumnWriter::resetIndex() {
-    // clear row index
-    rowIndex->clear_entry();
-    rowIndexEntry->clear_positions();
-    rowIndexEntry->clear_statistics();
+  void ColumnWriter::reset() {
+    if (enableIndex) {
+      // clear row index
+      rowIndex->clear_entry();
+      rowIndexEntry->clear_positions();
+      rowIndexEntry->clear_statistics();
 
-    // write current positions
-    recordPosition();
+      // write current positions
+      recordPosition();
+    }
   }
 
   class StructColumnWriter : public ColumnWriter {
@@ -222,7 +224,7 @@ namespace orc {
     virtual void writeIndex(
       std::vector<proto::Stream> &streams) const override;
 
-    virtual void resetIndex() override;
+    virtual void reset() override;
 
   private:
     std::vector<ColumnWriter *> children;
@@ -250,24 +252,26 @@ namespace orc {
   }
 
   void StructColumnWriter::add(
-                              ColumnVectorBatch& rowBatch,
-                              uint64_t offset,
-                              uint64_t numValues) {
+                               ColumnVectorBatch& rowBatch,
+                               uint64_t offset,
+                               uint64_t numValues) {
     ColumnWriter::add(rowBatch, offset, numValues);
-
-    const StructVectorBatch & structBatch =
-          dynamic_cast<const StructVectorBatch &>(rowBatch);
+    const StructVectorBatch* structBatch =
+      dynamic_cast<const StructVectorBatch *>(&rowBatch);
+    if (structBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to StructVectorBatch");
+    }
 
     for (uint32_t i = 0; i < children.size(); ++i) {
-      children[i]->add(*structBatch.fields[i], offset, numValues);
+      children[i]->add(*structBatch->fields[i], offset, numValues);
     }
 
     // update stats
     bool hasNull = false;
-    if (!structBatch.hasNulls) {
+    if (!structBatch->hasNulls) {
       colIndexStatistics->increase(numValues);
     } else {
-      const char* notNull = structBatch.notNull.data() + offset;
+      const char* notNull = structBatch->notNull.data() + offset;
       for (uint64_t i = 0; i < numValues; ++i) {
         if (notNull[i]) {
           colIndexStatistics->increase(1);
@@ -355,11 +359,11 @@ namespace orc {
     }
   }
 
-  void StructColumnWriter::resetIndex() {
-    ColumnWriter::resetIndex();
+  void StructColumnWriter::reset() {
+    ColumnWriter::reset();
 
     for (uint32_t i = 0; i < children.size(); ++i) {
-      children[i]->resetIndex();
+      children[i]->reset();
     }
   }
 
@@ -391,11 +395,11 @@ namespace orc {
   };
 
   IntegerColumnWriter::IntegerColumnWriter(
-                        const Type& type,
-                        const StreamsFactory& factory,
-                        const WriterOptions& options) :
-                          ColumnWriter(type, factory, options),
-                          rleVersion(RleVersion_1) {
+                           const Type& type,
+                           const StreamsFactory& factory,
+                           const WriterOptions& options) :
+                             ColumnWriter(type, factory, options),
+                             rleVersion(RleVersion_1) {
     std::unique_ptr<BufferedOutputStream> dataStream =
       factory.createStream(proto::Stream_Kind_DATA);
     rleEncoder = createRleEncoder(
@@ -410,31 +414,40 @@ namespace orc {
   }
 
   void IntegerColumnWriter::add(
-                              ColumnVectorBatch& rowBatch,
-                              uint64_t offset,
-                              uint64_t numValues) {
+                                ColumnVectorBatch& rowBatch,
+                                uint64_t offset,
+                                uint64_t numValues) {
     ColumnWriter::add(rowBatch, offset, numValues);
 
-    const LongVectorBatch & longBatch =
-                    dynamic_cast<const LongVectorBatch &>(rowBatch);
+    const LongVectorBatch* longBatch =
+      dynamic_cast<const LongVectorBatch*>(&rowBatch);
+    if (longBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to LongVectorBatch");
+    }
 
-    const int64_t* data = longBatch.data.data() + offset;
-    const char* notNull = longBatch.hasNulls ?
-                          longBatch.notNull.data() + offset : nullptr;
+    const int64_t* data = longBatch->data.data() + offset;
+    const char* notNull = longBatch->hasNulls ?
+                          longBatch->notNull.data() + offset : nullptr;
 
     rleEncoder->add(data, numValues, notNull);
 
     // update stats
     IntegerColumnStatisticsImpl* intStats =
       dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (intStats == nullptr) {
+      throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
+    }
+
+    bool hasNull = false;
     for (uint64_t i = 0; i < numValues; ++i) {
       if (notNull == nullptr || notNull[i]) {
         intStats->increase(1);
         intStats->update(data[i], 1);
-      } else if (!intStats->hasNull()) {
-        intStats->setHasNull(true);
+      } else if (!hasNull) {
+        hasNull = true;
       }
     }
+    intStats->setHasNull(hasNull);
   }
 
   void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
@@ -468,25 +481,1150 @@ namespace orc {
     rleEncoder->recordPosition(rowIndexPosition.get());
   }
 
-  std::unique_ptr<ColumnWriter> buildWriter(
-                                            const Type& type,
-                                            const StreamsFactory& factory,
-                                            const WriterOptions& options) {
-    switch (static_cast<int64_t>(type.getKind())) {
-      case STRUCT:
-        return std::unique_ptr<ColumnWriter>(
-          new StructColumnWriter(
-                                 type,
-                                 factory,
-                                 options));
-      case INT:
-      case LONG:
-      case SHORT:
-        return std::unique_ptr<ColumnWriter>(
-          new IntegerColumnWriter(
-                                  type,
-                                  factory,
-                                  options));
+  class ByteColumnWriter : public ColumnWriter {
+  public:
+    ByteColumnWriter(const Type& type,
+                     const StreamsFactory& factory,
+                     const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+            std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  private:
+    std::unique_ptr<ByteRleEncoder> byteRleEncoder;
+  };
+
+  ByteColumnWriter::ByteColumnWriter(
+                        const Type& type,
+                        const StreamsFactory& factory,
+                        const WriterOptions& options) :
+                             ColumnWriter(type, factory, options) {
+    std::unique_ptr<BufferedOutputStream> dataStream =
+                                  
factory.createStream(proto::Stream_Kind_DATA);
+    byteRleEncoder = createByteRleEncoder(std::move(dataStream));
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
+                             uint64_t offset,
+                             uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+
+    LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+    if (byteBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to LongVectorBatch");
+    }
+
+    int64_t* data = byteBatch->data.data() + offset;
+    const char* notNull = byteBatch->hasNulls ?
+                          byteBatch->notNull.data() + offset : nullptr;
+
+    char* byteData = reinterpret_cast<char*>(data);
+    for (uint64_t i = 0; i < numValues; ++i) {
+      byteData[i] = static_cast<char>(data[i]);
+    }
+    byteRleEncoder->add(byteData, numValues, notNull);
+
+    IntegerColumnStatisticsImpl* intStats =
+        dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (intStats == nullptr) {
+      throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (notNull == nullptr || notNull[i]) {
+        intStats->increase(1);
+        intStats->update(static_cast<int64_t>(byteData[i]), 1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    intStats->setHasNull(hasNull);
+  }
+
+  void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_DATA);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(byteRleEncoder->flush());
+    streams.push_back(stream);
+  }
+
+  uint64_t ByteColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += byteRleEncoder->getBufferSize();
+    return size;
+  }
+
+  void ByteColumnWriter::getColumnEncoding(
+    std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void ByteColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    byteRleEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  class BooleanColumnWriter : public ColumnWriter {
+  public:
+    BooleanColumnWriter(const Type& type,
+                        const StreamsFactory& factory,
+                        const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  private:
+    std::unique_ptr<ByteRleEncoder> rleEncoder;
+  };
+
+  BooleanColumnWriter::BooleanColumnWriter(
+                           const Type& type,
+                           const StreamsFactory& factory,
+                           const WriterOptions& options) :
+                               ColumnWriter(type, factory, options) {
+    std::unique_ptr<BufferedOutputStream> dataStream =
+      factory.createStream(proto::Stream_Kind_DATA);
+    rleEncoder = createBooleanRleEncoder(std::move(dataStream));
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
+                                uint64_t offset,
+                                uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+
+    LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+    if (byteBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to LongVectorBatch");
+    }
+    int64_t* data = byteBatch->data.data() + offset;
+    const char* notNull = byteBatch->hasNulls ?
+                          byteBatch->notNull.data() + offset : nullptr;
+
+    char* byteData = reinterpret_cast<char*>(data);
+    for (uint64_t i = 0; i < numValues; ++i) {
+      byteData[i] = static_cast<char>(data[i]);
+    }
+    rleEncoder->add(byteData, numValues, notNull);
+
+    BooleanColumnStatisticsImpl* boolStats =
+        dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (boolStats == nullptr) {
+      throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (notNull == nullptr || notNull[i]) {
+        boolStats->increase(1);
+        boolStats->update(byteData[i], 1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    boolStats->setHasNull(hasNull);
+  }
+
+  void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_DATA);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(rleEncoder->flush());
+    streams.push_back(stream);
+  }
+
+  uint64_t BooleanColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += rleEncoder->getBufferSize();
+    return size;
+  }
+
+  void BooleanColumnWriter::getColumnEncoding(
+                       std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void BooleanColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    rleEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  class DoubleColumnWriter : public ColumnWriter {
+  public:
+    DoubleColumnWriter(const Type& type,
+                       const StreamsFactory& factory,
+                       const WriterOptions& options,
+                       bool isFloat);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  private:
+    bool isFloat;
+    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+    DataBuffer<char> buffer;
+  };
+
+  DoubleColumnWriter::DoubleColumnWriter(
+                          const Type& type,
+                          const StreamsFactory& factory,
+                          const WriterOptions& options,
+                          bool isFloatType) :
+                              ColumnWriter(type, factory, options),
+                              isFloat(isFloatType),
+                              buffer(*options.getMemoryPool()) {
+    dataStream.reset(new AppendOnlyBufferedStream(
+                             factory.createStream(proto::Stream_Kind_DATA)));
+    buffer.resize(isFloat ? 4 : 8);
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  // Floating point types are stored using IEEE 754 floating point bit layout.
+  // Float columns use 4 bytes per value and double columns use 8 bytes.
+  template <typename FLOAT_TYPE, typename INTEGER_TYPE>
+  inline void encodeFloatNum(FLOAT_TYPE input, char* output) {
+    INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input);
+    for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) {
+      output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff);
+    }
+  }
+
+  void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
+                               uint64_t offset,
+                               uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    const DoubleVectorBatch* dblBatch =
+      dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
+    if (dblBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to DoubleVectorBatch");
+    }
+
+    const double* doubleData = dblBatch->data.data() + offset;
+    const char* notNull = dblBatch->hasNulls ?
+                          dblBatch->notNull.data() + offset : nullptr;
+
+    DoubleColumnStatisticsImpl* doubleStats =
+      dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (doubleStats == nullptr) {
+      throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl");
+    }
+
+    size_t bytes = isFloat ? 4 : 8;
+    char* data = buffer.data();
+    bool hasNull = false;
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        if (isFloat) {
+          encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), 
data);
+        } else {
+          encodeFloatNum<double, int64_t>(doubleData[i], data);
+        }
+        dataStream->write(data, bytes);
+
+        doubleStats->increase(1);
+        doubleStats->update(doubleData[i]);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    doubleStats->setHasNull(hasNull);
+  }
+
+  void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_DATA);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(dataStream->flush());
+    streams.push_back(stream);
+  }
+
+  uint64_t DoubleColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += dataStream->getSize();
+    return size;
+  }
+
+  void DoubleColumnWriter::getColumnEncoding(
+                      std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void DoubleColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    dataStream->recordPosition(rowIndexPosition.get());
+  }
+
+  class StringColumnWriter : public ColumnWriter {
+  public:
+    StringColumnWriter(const Type& type,
+                       const StreamsFactory& factory,
+                       const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  protected:
+    std::unique_ptr<RleEncoder> lengthEncoder;
+    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+    RleVersion rleVersion;
+  };
+
+  StringColumnWriter::StringColumnWriter(
+                          const Type& type,
+                          const StreamsFactory& factory,
+                          const WriterOptions& options) :
+                              ColumnWriter(type, factory, options),
+                              rleVersion(RleVersion_1) {
+    std::unique_ptr<BufferedOutputStream> lengthStream =
+        factory.createStream(proto::Stream_Kind_LENGTH);
+    lengthEncoder = createRleEncoder(std::move(lengthStream),
+                                     false,
+                                     rleVersion,
+                                     memPool);
+    dataStream.reset(new AppendOnlyBufferedStream(
+        factory.createStream(proto::Stream_Kind_DATA)));
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
+                               uint64_t offset,
+                               uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    const StringVectorBatch* stringBatch =
+      dynamic_cast<const StringVectorBatch*>(&rowBatch);
+    if (stringBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to StringVectorBatch");
+    }
+
+    char *const * data = stringBatch->data.data() + offset;
+    const int64_t* length = stringBatch->length.data() + offset;
+    const char* notNull = stringBatch->hasNulls ?
+                          stringBatch->notNull.data() + offset : nullptr;
+
+    lengthEncoder->add(length, numValues, notNull);
+
+    StringColumnStatisticsImpl* strStats =
+      dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (strStats == nullptr) {
+      throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        dataStream->write(data[i], static_cast<size_t>(length[i]));
+        strStats->update(data[i], static_cast<size_t>(length[i]));
+        strStats->increase(1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    strStats->setHasNull(hasNull);
+  }
+
+  void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream length;
+    length.set_kind(proto::Stream_Kind_LENGTH);
+    length.set_column(static_cast<uint32_t>(columnId));
+    length.set_length(lengthEncoder->flush());
+    streams.push_back(length);
+
+    proto::Stream data;
+    data.set_kind(proto::Stream_Kind_DATA);
+    data.set_column(static_cast<uint32_t>(columnId));
+    data.set_length(dataStream->flush());
+    streams.push_back(data);
+  }
+
+  uint64_t StringColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += lengthEncoder->getBufferSize();
+    size += dataStream->getSize();
+    return size;
+  }
+
+  void StringColumnWriter::getColumnEncoding(
+    std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(rleVersion == RleVersion_1 ?
+                      proto::ColumnEncoding_Kind_DIRECT :
+                      proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void StringColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    dataStream->recordPosition(rowIndexPosition.get());
+    lengthEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  class CharColumnWriter : public StringColumnWriter {
+  public:
+    CharColumnWriter(const Type& type,
+                     const StreamsFactory& factory,
+                     const WriterOptions& options) :
+                         StringColumnWriter(type, factory, options),
+                         fixedLength(type.getMaximumLength()),
+                         padBuffer(*options.getMemoryPool(),
+                                   type.getMaximumLength()) {
+      // PASS
+    }
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+  private:
+    uint64_t fixedLength;
+    DataBuffer<char> padBuffer;
+  };
+
+  void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
+                             uint64_t offset,
+                             uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    StringVectorBatch* charsBatch = 
dynamic_cast<StringVectorBatch*>(&rowBatch);
+    if (charsBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to StringVectorBatch");
+    }
+
+    char** data = charsBatch->data.data() + offset;
+    int64_t* length = charsBatch->length.data() + offset;
+    const char* notNull = charsBatch->hasNulls ?
+                          charsBatch->notNull.data() + offset : nullptr;
+
+    StringColumnStatisticsImpl* strStats =
+        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (strStats == nullptr) {
+      throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        char *charData = data[i];
+        uint64_t oriLength = static_cast<uint64_t>(length[i]);
+        if (oriLength < fixedLength) {
+          memcpy(padBuffer.data(), data[i], oriLength);
+          memset(padBuffer.data() + oriLength, ' ', fixedLength - oriLength);
+          charData = padBuffer.data();
+        }
+        length[i] = static_cast<int64_t>(fixedLength);
+        dataStream->write(charData, fixedLength);
+
+        strStats->update(charData, fixedLength);
+        strStats->increase(1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    lengthEncoder->add(length, numValues, notNull);
+    strStats->setHasNull(hasNull);
+  }
+
+  class VarCharColumnWriter : public StringColumnWriter {
+  public:
+    VarCharColumnWriter(const Type& type,
+                        const StreamsFactory& factory,
+                        const WriterOptions& options) :
+                            StringColumnWriter(type, factory, options),
+                            maxLength(type.getMaximumLength()) {
+      // PASS
+    }
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+  private:
+    uint64_t maxLength;
+  };
+
+  void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
+                                uint64_t offset,
+                                uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    StringVectorBatch* charsBatch = 
dynamic_cast<StringVectorBatch*>(&rowBatch);
+    if (charsBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to StringVectorBatch");
+    }
+
+    char* const* data = charsBatch->data.data() + offset;
+    int64_t* length = charsBatch->length.data() + offset;
+    const char* notNull = charsBatch->hasNulls ?
+                          charsBatch->notNull.data() + offset : nullptr;
+
+    StringColumnStatisticsImpl* strStats =
+        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (strStats == nullptr) {
+      throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        if (length[i] > static_cast<int64_t>(maxLength)) {
+          length[i] = static_cast<int64_t>(maxLength);
+        }
+        dataStream->write(data[i], static_cast<size_t>(length[i]));
+
+        strStats->update(data[i], static_cast<size_t>(length[i]));
+        strStats->increase(1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    lengthEncoder->add(length, numValues, notNull);
+    strStats->setHasNull(hasNull);
+  }
+
+  class BinaryColumnWriter : public StringColumnWriter {
+  public:
+    BinaryColumnWriter(const Type& type,
+                       const StreamsFactory& factory,
+                       const WriterOptions& options) :
+                           StringColumnWriter(type, factory, options) {
+      // PASS
+    }
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+  };
+
+  void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
+                               uint64_t offset,
+                               uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+
+    StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
+    if (binBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to StringVectorBatch");
+    }
+    char** data = binBatch->data.data() + offset;
+    int64_t* length = binBatch->length.data() + offset;
+    const char* notNull = binBatch->hasNulls ?
+                          binBatch->notNull.data() + offset : nullptr;
+
+    BinaryColumnStatisticsImpl* binStats =
+        dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (binStats == nullptr) {
+      throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl");
+    }
+
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
+      if (!notNull || notNull[i]) {
+        dataStream->write(data[i], unsignedLength);
+
+        binStats->update(unsignedLength);
+        binStats->increase(1);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    lengthEncoder->add(length, numValues, notNull);
+    binStats->setHasNull(hasNull);
+  }
+
+  class TimestampColumnWriter : public ColumnWriter {
+  public:
+    TimestampColumnWriter(const Type& type,
+                          const StreamsFactory& factory,
+                          const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  protected:
+    std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
+
+  private:
+    RleVersion rleVersion;
+    const Timezone& timezone;
+  };
+
+  TimestampColumnWriter::TimestampColumnWriter(
+                             const Type& type,
+                             const StreamsFactory& factory,
+                             const WriterOptions& options) :
+                                 ColumnWriter(type, factory, options),
+                                 rleVersion(RleVersion_1),
+                                 timezone(getLocalTimezone()){
+    std::unique_ptr<BufferedOutputStream> dataStream =
+        factory.createStream(proto::Stream_Kind_DATA);
+    std::unique_ptr<BufferedOutputStream> secondaryStream =
+        factory.createStream(proto::Stream_Kind_SECONDARY);
+    secRleEncoder = createRleEncoder(std::move(dataStream),
+                                     true,
+                                     rleVersion,
+                                     memPool);
+    nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
+                                      false,
+                                      rleVersion,
+                                      memPool);
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  // Because the number of nanoseconds often has a large number of trailing 
zeros,
+  // the number has trailing decimal zero digits removed and the last three 
bits
+  // are used to record how many zeros were removed. Thus 1000 nanoseconds 
would
+  // be serialized as 0x0b and 100000 would be serialized as 0x0d.
+  static int64_t formatNano(int64_t nanos) {
+    if (nanos == 0) {
+      return 0;
+    } else if (nanos % 100 != 0) {
+      return (nanos) << 3;
+    } else {
+      nanos /= 100;
+      int64_t trailingZeros = 1;
+      while (nanos % 10 == 0 && trailingZeros < 7) {
+        nanos /= 10;
+        trailingZeros += 1;
+      }
+      return (nanos) << 3 | trailingZeros;
+    }
+  }
+
+  void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
+                                  uint64_t offset,
+                                  uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    TimestampVectorBatch* tsBatch =
+      dynamic_cast<TimestampVectorBatch*>(&rowBatch);
+    if (tsBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to TimestampVectorBatch");
+    }
+
+    const char* notNull = tsBatch->hasNulls ?
+                          tsBatch->notNull.data() + offset : nullptr;
+    int64_t *secs = tsBatch->data.data() + offset;
+    int64_t *nanos = tsBatch->nanoseconds.data() + offset;
+
+    TimestampColumnStatisticsImpl* tsStats =
+        dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (tsStats == nullptr) {
+      throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (notNull == nullptr || notNull[i]) {
+        // TimestampVectorBatch already stores data in UTC
+        int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
+        tsStats->increase(1);
+        tsStats->update(millsUTC);
+
+        secs[i] -= timezone.getVariant(secs[i]).gmtOffset;
+        secs[i] -= timezone.getEpoch();
+        nanos[i] = formatNano(nanos[i]);
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    tsStats->setHasNull(hasNull);
+
+    secRleEncoder->add(secs, numValues, notNull);
+    nanoRleEncoder->add(nanos, numValues, notNull);
+  }
+
+  void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream dataStream;
+    dataStream.set_kind(proto::Stream_Kind_DATA);
+    dataStream.set_column(static_cast<uint32_t>(columnId));
+    dataStream.set_length(secRleEncoder->flush());
+    streams.push_back(dataStream);
+
+    proto::Stream secondaryStream;
+    secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
+    secondaryStream.set_column(static_cast<uint32_t>(columnId));
+    secondaryStream.set_length(nanoRleEncoder->flush());
+    streams.push_back(secondaryStream);
+  }
+
+  uint64_t TimestampColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += secRleEncoder->getBufferSize();
+    size += nanoRleEncoder->getBufferSize();
+    return size;
+  }
+
+  void TimestampColumnWriter::getColumnEncoding(
+    std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(rleVersion == RleVersion_1 ?
+                      proto::ColumnEncoding_Kind_DIRECT :
+                      proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void TimestampColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    secRleEncoder->recordPosition(rowIndexPosition.get());
+    nanoRleEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  class DateColumnWriter : public IntegerColumnWriter {
+  public:
+    DateColumnWriter(const Type& type,
+                     const StreamsFactory& factory,
+                     const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+  };
+
+  DateColumnWriter::DateColumnWriter(
+                        const Type &type,
+                        const StreamsFactory &factory,
+                        const WriterOptions &options) :
+                            IntegerColumnWriter(type, factory, options) {
+    // PASS
+  }
+
+  void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
+                             uint64_t offset,
+                             uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    const LongVectorBatch* longBatch =
+      dynamic_cast<const LongVectorBatch*>(&rowBatch);
+    if (longBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to LongVectorBatch");
+    }
+
+    const int64_t* data = longBatch->data.data() + offset;
+    const char* notNull = longBatch->hasNulls ?
+                          longBatch->notNull.data() + offset : nullptr;
+
+    rleEncoder->add(data, numValues, notNull);
+
+    DateColumnStatisticsImpl* dateStats =
+      dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (dateStats == nullptr) {
+      throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        dateStats->increase(1);
+        dateStats->update(static_cast<int32_t>(data[i]));
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    dateStats->setHasNull(hasNull);
+  }
+
+  class Decimal64ColumnWriter : public ColumnWriter {
+  public:
+    static const uint32_t MAX_PRECISION_64 = 18;
+    static const uint32_t MAX_PRECISION_128 = 38;
+
+    Decimal64ColumnWriter(const Type& type,
+                          const StreamsFactory& factory,
+                          const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  protected:
+    RleVersion rleVersion;
+    uint64_t precision;
+    uint64_t scale;
+    std::unique_ptr<AppendOnlyBufferedStream> valueStream;
+    std::unique_ptr<RleEncoder> scaleEncoder;
+
+  private:
+    char buffer[8];
+  };
+
+  Decimal64ColumnWriter::Decimal64ColumnWriter(
+                             const Type& type,
+                             const StreamsFactory& factory,
+                             const WriterOptions& options) :
+                                 ColumnWriter(type, factory, options),
+                                 rleVersion(RleVersion_1),
+                                 precision(type.getPrecision()),
+                                 scale(type.getScale()) {
+    valueStream.reset(new AppendOnlyBufferedStream(
+        factory.createStream(proto::Stream_Kind_DATA)));
+    std::unique_ptr<BufferedOutputStream> scaleStream =
+        factory.createStream(proto::Stream_Kind_SECONDARY);
+    scaleEncoder = createRleEncoder(std::move(scaleStream),
+                                    true,
+                                    rleVersion,
+                                    memPool);
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
+                                  uint64_t offset,
+                                  uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    const Decimal64VectorBatch* decBatch =
+      dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
+    if (decBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
+    }
+
+    const char* notNull = decBatch->hasNulls ?
+                          decBatch->notNull.data() + offset : nullptr;
+    const int64_t* values = decBatch->values.data() + offset;
+    DecimalColumnStatisticsImpl* decStats =
+      dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (decStats == nullptr) {
+      throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        int64_t val = zigZag(values[i]);
+        char* data = buffer;
+        while (true) {
+          if ((val & ~0x7f) == 0) {
+            *(data++) = (static_cast<char>(val));
+            break;
+          } else {
+            *(data++) = static_cast<char>(0x80 | (val & 0x7f));
+            // cast val to unsigned so as to force 0-fill right shift
+            val = (static_cast<uint64_t>(val) >> 7);
+          }
+        }
+        valueStream->write(buffer, static_cast<size_t>(data - buffer));
+
+        decStats->increase(1);
+        decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
+    scaleEncoder->add(scales.data(), numValues, notNull);
+
+    decStats->setHasNull(hasNull);
+  }
+
+  void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream dataStream;
+    dataStream.set_kind(proto::Stream_Kind_DATA);
+    dataStream.set_column(static_cast<uint32_t>(columnId));
+    dataStream.set_length(valueStream->flush());
+    streams.push_back(dataStream);
+
+    proto::Stream secondaryStream;
+    secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
+    secondaryStream.set_column(static_cast<uint32_t>(columnId));
+    secondaryStream.set_length(scaleEncoder->flush());
+    streams.push_back(secondaryStream);
+  }
+
+  uint64_t Decimal64ColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += valueStream->getSize();
+    size += scaleEncoder->getBufferSize();
+    return size;
+  }
+
+  void Decimal64ColumnWriter::getColumnEncoding(
+    std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void Decimal64ColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    valueStream->recordPosition(rowIndexPosition.get());
+    scaleEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  class Decimal128ColumnWriter : public Decimal64ColumnWriter {
+  public:
+    Decimal128ColumnWriter(const Type& type,
+                           const StreamsFactory& factory,
+                           const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+  private:
+    char buffer[16];
+  };
+
+  Decimal128ColumnWriter::Decimal128ColumnWriter(
+                              const Type& type,
+                              const StreamsFactory& factory,
+                              const WriterOptions& options) :
+                                Decimal64ColumnWriter(type, factory, options) {
+    // PASS
+  }
+
+  // Zigzag encoding moves the sign bit to the least significant bit using the
+  // expression (val « 1) ^ (val » 63) and derives its name from the fact 
that
+  // positive and negative numbers alternate once encoded.
+  Int128 zigZagInt128(const Int128& value) {
+    bool isNegative = value < 0;
+    Int128 val = value.abs();
+    val <<= 1;
+    if (isNegative) {
+      val -= 1;
+    }
+    return val;
+  }
+
+  void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
+                                   uint64_t offset,
+                                   uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+    const Decimal128VectorBatch* decBatch =
+      dynamic_cast<const Decimal128VectorBatch*>(&rowBatch);
+    if (decBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to Decimal128VectorBatch");
+    }
+
+    const char* notNull = decBatch->hasNulls ?
+                          decBatch->notNull.data() + offset : nullptr;
+    const Int128* values = decBatch->values.data() + offset;
+    DecimalColumnStatisticsImpl* decStats =
+      dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (decStats == nullptr) {
+      throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+    }
+    bool hasNull = false;
+
+    // The current encoding of decimal columns stores the integer 
representation
+    // of the value as an unbounded length zigzag encoded base 128 varint.
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        Int128 val = zigZagInt128(values[i]);
+        char* data = buffer;
+        while (true) {
+          if ((val & ~0x7f) == 0) {
+            *(data++) = (static_cast<char>(val.getLowBits()));
+            break;
+          } else {
+            *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f));
+            val >>= 7;
+          }
+        }
+        valueStream->write(buffer, static_cast<size_t>(data - buffer));
+
+        decStats->increase(1);
+        decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
+      } else if (!hasNull) {
+        hasNull = true;
+      }
+    }
+    std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
+    scaleEncoder->add(scales.data(), numValues, notNull);
+
+    decStats->setHasNull(hasNull);
+  }
+
+  std::unique_ptr<ColumnWriter> buildWriter(
+                                            const Type& type,
+                                            const StreamsFactory& factory,
+                                            const WriterOptions& options) {
+    switch (static_cast<int64_t>(type.getKind())) {
+      case STRUCT:
+        return std::unique_ptr<ColumnWriter>(
+          new StructColumnWriter(
+                                 type,
+                                 factory,
+                                 options));
+      case INT:
+      case LONG:
+      case SHORT:
+        return std::unique_ptr<ColumnWriter>(
+          new IntegerColumnWriter(
+                                  type,
+                                  factory,
+                                  options));
+      case BYTE:
+        return std::unique_ptr<ColumnWriter>(
+          new ByteColumnWriter(
+                               type,
+                               factory,
+                               options));
+      case BOOLEAN:
+        return std::unique_ptr<ColumnWriter>(
+          new BooleanColumnWriter(
+                                  type,
+                                  factory,
+                                  options));
+      case DOUBLE:
+        return std::unique_ptr<ColumnWriter>(
+          new DoubleColumnWriter(
+                                 type,
+                                 factory,
+                                 options,
+                                 false));
+      case FLOAT:
+        return std::unique_ptr<ColumnWriter>(
+          new DoubleColumnWriter(
+                                 type,
+                                 factory,
+                                 options,
+                                 true));
+      case BINARY:
+        return std::unique_ptr<ColumnWriter>(
+          new BinaryColumnWriter(
+                                 type,
+                                 factory,
+                                 options));
+      case STRING:
+        return std::unique_ptr<ColumnWriter>(
+          new StringColumnWriter(
+                                 type,
+                                 factory,
+                                 options));
+      case CHAR:
+        return std::unique_ptr<ColumnWriter>(
+          new CharColumnWriter(
+                               type,
+                               factory,
+                               options));
+      case VARCHAR:
+        return std::unique_ptr<ColumnWriter>(
+          new VarCharColumnWriter(
+                                  type,
+                                  factory,
+                                  options));
+      case DATE:
+        return std::unique_ptr<ColumnWriter>(
+          new DateColumnWriter(
+                               type,
+                               factory,
+                               options));
+      case TIMESTAMP:
+        return std::unique_ptr<ColumnWriter>(
+          new TimestampColumnWriter(
+                                    type,
+                                    factory,
+                                    options));
+      case DECIMAL:
+        if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
+          return std::unique_ptr<ColumnWriter>(
+            new Decimal64ColumnWriter(
+                                      type,
+                                      factory,
+                                      options));
+        } else if (type.getPrecision() <= 
Decimal64ColumnWriter::MAX_PRECISION_128) {
+          return std::unique_ptr<ColumnWriter>(
+            new Decimal128ColumnWriter(
+                                       type,
+                                       factory,
+                                       options));
+        } else {
+          throw NotImplementedYet("Decimal precision more than 38 is not "
+                                    "supported");
+        }
       default:
         throw NotImplementedYet("Type is not supported yet for creating "
                                   "ColumnWriter.");

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/ColumnWriter.hh
----------------------------------------------------------------------
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 774f9b5..e6f076d 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -163,7 +163,7 @@ namespace orc {
     /**
      * Reset positions for index
      */
-    virtual void resetIndex();
+    virtual void reset();
 
   protected:
     /**

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Exceptions.cc
----------------------------------------------------------------------
diff --git a/c++/src/Exceptions.cc b/c++/src/Exceptions.cc
index ae0e3d1..ef6c0c2 100644
--- a/c++/src/Exceptions.cc
+++ b/c++/src/Exceptions.cc
@@ -56,4 +56,23 @@ namespace orc {
   ParseError::~ParseError() noexcept {
     // PASS
   }
+
+  InvalidArgument::InvalidArgument(const std::string& what_arg
+                                   ): runtime_error(what_arg) {
+    // PASS
+  }
+
+  InvalidArgument::InvalidArgument(const char* what_arg
+                                   ): runtime_error(what_arg) {
+    // PASS
+  }
+
+  InvalidArgument::InvalidArgument(const InvalidArgument& error
+                                   ): runtime_error(error) {
+    // PASS
+  }
+
+  InvalidArgument::~InvalidArgument() noexcept {
+    // PASS
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Exceptions.hh
----------------------------------------------------------------------
diff --git a/c++/src/Exceptions.hh b/c++/src/Exceptions.hh
index 4706085..34b7818 100644
--- a/c++/src/Exceptions.hh
+++ b/c++/src/Exceptions.hh
@@ -45,6 +45,16 @@ namespace orc {
   private:
     ParseError& operator=(const ParseError&);
   };
+
+  class InvalidArgument: public std::runtime_error {
+  public:
+    explicit InvalidArgument(const std::string& what_arg);
+    explicit InvalidArgument(const char* what_arg);
+    virtual ~InvalidArgument() noexcept;
+    InvalidArgument(const InvalidArgument&);
+  private:
+    InvalidArgument& operator=(const InvalidArgument&);
+  };
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Writer.cc
----------------------------------------------------------------------
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index a4ae090..22f5750 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -369,11 +369,6 @@ namespace orc {
     // write streams like PRESENT, DATA, etc.
     columnWriter->flush(streams);
 
-    // only until all streams are flushed can we reset positions
-    if (options.getEnableIndex()) {
-      columnWriter->resetIndex();
-    }
-
     // generate and write stripe footer
     proto::StripeFooter stripeFooter;
     for (uint32_t i = 0; i < streams.size(); ++i) {
@@ -426,6 +421,8 @@ namespace orc {
     currentOffset = currentOffset + indexLength + dataLength + footerLength;
     totalRows += stripeRows;
 
+    columnWriter->reset();
+
     initStripe();
   }
 
@@ -568,7 +565,7 @@ namespace orc {
   std::unique_ptr<Writer> createWriter(
                                        const Type& type,
                                        OutputStream* stream,
-                                      const WriterOptions& options) {
+                                       const WriterOptions& options) {
     return std::unique_ptr<Writer>(
                                    new WriterImpl(
                                             type,

http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/test/TestWriter.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 3df4626..d401ca3 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -31,7 +31,7 @@
 
 namespace orc {
 
-  const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
+  const int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024; // 100M
 
   std::unique_ptr<Writer> createWriter(
                                       uint64_t stripeSize,
@@ -213,5 +213,612 @@ namespace orc {
     }
     EXPECT_FALSE(rowReader->next(*batch));
   }
-}
 
+  TEST(Writer, writeStringAndBinaryColumn) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:string,col2:binary>"));
+
+    uint64_t stripeSize = 1024;     // 1K
+    uint64_t compressionBlockSize = 1024;      // 1k
+
+    char dataBuffer[327675];
+    uint64_t offset = 0;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    StringVectorBatch * strBatch =
+      dynamic_cast<StringVectorBatch *>(structBatch->fields[0]);
+    StringVectorBatch * binBatch =
+      dynamic_cast<StringVectorBatch *>(structBatch->fields[1]);
+
+    for (uint64_t i = 0; i < 65535; ++i) {
+      std::ostringstream os;
+      os << i;
+      strBatch->data[i] = dataBuffer + offset;
+      strBatch->length[i] = static_cast<int64_t>(os.str().size());
+      binBatch->data[i] = dataBuffer + offset;
+      binBatch->length[i] = static_cast<int64_t>(os.str().size());
+      memcpy(dataBuffer + offset, os.str().c_str(), os.str().size());
+      offset += os.str().size();
+    }
+
+    structBatch->numElements = 65535;
+    strBatch->numElements = 65535;
+    binBatch->numElements = 65535;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(),
+                             memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(65535, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(65535);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    EXPECT_EQ(65535, batch->numElements);
+
+    for (uint64_t i = 0; i < 65535; ++i) {
+      structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+      strBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[0]);
+      binBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[1]);
+      std::string str(
+        strBatch->data[i],
+        static_cast<size_t>(strBatch->length[i]));
+      std::string bin(
+        binBatch->data[i],
+        static_cast<size_t>(binBatch->length[i]));
+      EXPECT_EQ(i, static_cast<uint64_t>(atoi(str.c_str())));
+      EXPECT_EQ(i, static_cast<uint64_t>(atoi(bin.c_str())));
+    }
+
+    EXPECT_EQ(false, rowReader->next(*batch));
+  }
+
+  TEST(Writer, writeFloatAndDoubleColumn) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:double,col2:float>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 655350;
+
+    double data[655350];
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX);
+    }
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    DoubleVectorBatch * doubleBatch =
+      dynamic_cast<DoubleVectorBatch *>(structBatch->fields[0]);
+    DoubleVectorBatch * floatBatch =
+      dynamic_cast<DoubleVectorBatch *>(structBatch->fields[1]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      doubleBatch->data[i] = data[i];
+      floatBatch->data[i] = data[i];
+    }
+
+    structBatch->numElements = rowCount;
+    doubleBatch->numElements = rowCount;
+    floatBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    EXPECT_EQ(rowCount, batch->numElements);
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    doubleBatch = dynamic_cast<DoubleVectorBatch *>(structBatch->fields[0]);
+    floatBatch = dynamic_cast<DoubleVectorBatch *>(structBatch->fields[1]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001);
+      EXPECT_TRUE(std::abs(static_cast<float>(data[i]) -
+                           static_cast<float>(floatBatch->data[i])) < 
0.000001f);
+    }
+    EXPECT_EQ(false, rowReader->next(*batch));
+  }
+
+  TEST(Writer, writeShortIntLong) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:smallint,col2:int,col3:bigint>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 65535;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    LongVectorBatch * smallIntBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+    LongVectorBatch * intBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[1]);
+    LongVectorBatch * bigIntBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[2]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      smallIntBatch->data[i] = static_cast<int16_t>(i);
+      intBatch->data[i] = static_cast<int32_t>(i);
+      bigIntBatch->data[i] = static_cast<int64_t>(i);
+    }
+    structBatch->numElements = rowCount;
+    smallIntBatch->numElements = rowCount;
+    intBatch->numElements = rowCount;
+    bigIntBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    smallIntBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+    intBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[1]);
+    bigIntBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[2]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(static_cast<int16_t>(i), smallIntBatch->data[i]);
+      EXPECT_EQ(static_cast<int32_t>(i), intBatch->data[i]);
+      EXPECT_EQ(static_cast<int64_t>(i), bigIntBatch->data[i]);
+    }
+  }
+
+  TEST(Writer, writeTinyint) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:tinyint>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 65535;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    LongVectorBatch * byteBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      byteBatch->data[i] = static_cast<int8_t>(i);
+    }
+    structBatch->numElements = rowCount;
+    byteBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    byteBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(static_cast<int8_t>(i), 
static_cast<int8_t>(byteBatch->data[i]));
+    }
+  }
+
+  TEST(Writer, writeBooleanColumn) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> 
type(Type::buildTypeFromString("struct<col1:boolean>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 65535;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    LongVectorBatch * byteBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      byteBatch->data[i] = (i % 3) == 0;
+    }
+    structBatch->numElements = rowCount;
+    byteBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    byteBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ((i % 3) == 0, byteBatch->data[i]);
+    }
+  }
+
+  TEST(Writer, writeDate) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:date>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 1024;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    LongVectorBatch * longBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      longBatch->data[i] = static_cast<int32_t>(i);
+    }
+    structBatch->numElements = rowCount;
+    longBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    longBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(static_cast<int32_t>(i), longBatch->data[i]);
+    }
+  }
+
+  TEST(Writer, writeTimestamp) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> 
type(Type::buildTypeFromString("struct<col1:timestamp>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 1024;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    TimestampVectorBatch * tsBatch =
+      dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+
+    std::vector<std::time_t> times(rowCount);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      time_t currTime = std::time(nullptr);
+      times[i] = static_cast<int64_t>(currTime) - static_cast<int64_t >(i * 
60);
+      tsBatch->data[i] = times[i];
+      tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000);
+    }
+    structBatch->numElements = rowCount;
+    tsBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    tsBatch = dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(times[i], tsBatch->data[i]);
+      EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
+    }
+  }
+
+  TEST(Writer, writeCharAndVarcharColumn) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:char(3),col2:varchar(4)>"));
+
+    uint64_t stripeSize = 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 65535;
+
+    char dataBuffer[327675];
+    uint64_t offset = 0;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    StringVectorBatch * charBatch =
+      dynamic_cast<StringVectorBatch *>(structBatch->fields[0]);
+    StringVectorBatch * varcharBatch =
+      dynamic_cast<StringVectorBatch *>(structBatch->fields[1]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      std::ostringstream os;
+      os << i;
+      charBatch->data[i] = dataBuffer + offset;
+      charBatch->length[i] = static_cast<int64_t>(os.str().size());
+
+      varcharBatch->data[i] = charBatch->data[i];
+      varcharBatch->length[i] = charBatch->length[i];
+
+      memcpy(dataBuffer + offset, os.str().c_str(), os.str().size());
+      offset += os.str().size();
+    }
+
+    structBatch->numElements = rowCount;
+    charBatch->numElements = rowCount;
+    varcharBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    EXPECT_EQ(rowCount, batch->numElements);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+      charBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[0]);
+      varcharBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[1]);
+
+      EXPECT_EQ(3, charBatch->length[i]);
+      EXPECT_FALSE(varcharBatch->length[i] > 4);
+
+      // test char data
+      std::string charsRead(
+        charBatch->data[i],
+        static_cast<size_t>(charBatch->length[i]));
+
+      std::ostringstream os;
+      os << i;
+      std::string charsExpected = os.str().substr(0, 3);
+      while (charsExpected.length() < 3) {
+        charsExpected += ' ';
+      }
+      EXPECT_EQ(charsExpected, charsRead);
+
+      // test varchar data
+      std::string varcharRead(
+        varcharBatch->data[i],
+        static_cast<size_t>(varcharBatch->length[i]));
+      std::string varcharExpected = os.str().substr(0, 4);
+      EXPECT_EQ(varcharRead, varcharExpected);
+    }
+
+    EXPECT_EQ(false, rowReader->next(*batch));
+  }
+
+  TEST(Writer, writeDecimal64Column) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:decimal(16,5)>"));
+
+    uint64_t stripeSize = 16 * 1024; // 16K
+    uint64_t compressionBlockSize = 1024; // 1k
+    uint64_t rowCount = 1024;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    Decimal64VectorBatch * decBatch =
+      dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]);
+
+    // write positive decimals
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      decBatch->values[i] = static_cast<int64_t>(i + 10000);
+    }
+    structBatch->numElements = decBatch->numElements = rowCount;
+    writer->add(*batch);
+
+    // write negative decimals
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      decBatch->values[i] = static_cast<int64_t>(i - 10000);
+    }
+    structBatch->numElements = decBatch->numElements = rowCount;
+    writer->add(*batch);
+
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount * 2, reader->getNumberOfRows());
+
+    // test reading positive decimals
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    decBatch = dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(static_cast<int64_t>(i + 10000), decBatch->values[i]);
+    }
+
+    // test reading negative decimals
+    EXPECT_EQ(true, rowReader->next(*batch));
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    decBatch = dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(static_cast<int64_t>(i - 10000), decBatch->values[i]);
+    }
+  }
+
+  TEST(Writer, writeDecimal128Column) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:decimal(30,10)>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 1024;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    Decimal128VectorBatch * decBatch =
+      dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]);
+
+    // write positive decimals
+    std::string base = "1" + std::string(1, '0');
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      std::ostringstream os;
+      os << i;
+      decBatch->values[i] = Int128(base + os.str());
+    }
+    structBatch->numElements = decBatch->numElements = rowCount;
+    writer->add(*batch);
+
+    // write negative decimals
+    std::string nbase = "-" + base;
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      std::ostringstream os;
+      os << i;
+      decBatch->values[i] = Int128(nbase + os.str());
+    }
+    structBatch->numElements = rowCount;
+    decBatch->numElements = rowCount;
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount * 2, reader->getNumberOfRows());
+
+    // test reading positive decimals
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    decBatch = dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      std::ostringstream os;
+      os << i;
+      EXPECT_EQ(base + os.str(), decBatch->values[i].toString());
+    }
+
+    // test reading negative decimals and different scales
+    EXPECT_EQ(true, rowReader->next(*batch));
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    decBatch = dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      std::ostringstream os;
+      os << i;
+      EXPECT_EQ(nbase + os.str(), decBatch->values[i].toString());
+    }
+  }
+}

Reply via email to