Repository: orc
Updated Branches:
  refs/heads/master 268fccb39 -> b82e2f4f8


http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Writer.cc
----------------------------------------------------------------------
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
new file mode 100644
index 0000000..3603f94
--- /dev/null
+++ b/c++/src/Writer.cc
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Common.hh"
+#include "orc/OrcFile.hh"
+
+#include "ColumnWriter.hh"
+#include "Timezone.hh"
+
+#include <memory>
+
+namespace orc {
+
+  struct WriterOptionsPrivate {
+    uint64_t stripeSize;
+    uint64_t compressionBlockSize;
+    uint64_t rowIndexStride;
+    CompressionKind compression;
+    CompressionStrategy compressionStrategy;
+    MemoryPool* memoryPool;
+    double paddingTolerance;
+    std::ostream* errorStream;
+    FileVersion fileVersion;
+    double dictionaryKeySizeThreshold;
+    bool enableIndex;
+
+    WriterOptionsPrivate() :
+                            fileVersion(0, 11) { // default to Hive_0_11
+      stripeSize = 64 * 1024 * 1024; // 64M
+      compressionBlockSize = 64 * 1024; // 64K
+      rowIndexStride = 10000;
+      compression = CompressionKind_ZLIB;
+      compressionStrategy = CompressionStrategy_SPEED;
+      memoryPool = getDefaultPool();
+      paddingTolerance = 0.0;
+      errorStream = &std::cerr;
+      dictionaryKeySizeThreshold = 0.0;
+      enableIndex = true;
+    }
+  };
+
+  WriterOptions::WriterOptions():
+    privateBits(std::unique_ptr<WriterOptionsPrivate>
+                (new WriterOptionsPrivate())) {
+    // PASS
+  }
+
+  WriterOptions::WriterOptions(const WriterOptions& rhs):
+    privateBits(std::unique_ptr<WriterOptionsPrivate>
+                (new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
+    // PASS
+  }
+
+  WriterOptions::WriterOptions(WriterOptions& rhs) {
+    // swap privateBits with rhs
+    WriterOptionsPrivate* l = privateBits.release();
+    privateBits.reset(rhs.privateBits.release());
+    rhs.privateBits.reset(l);
+  }
+
+  WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
+    if (this != &rhs) {
+      privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get())));
+    }
+    return *this;
+  }
+
+  WriterOptions::~WriterOptions() {
+    // PASS
+  }
+
+  WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
+    privateBits->stripeSize = size;
+    return *this;
+  }
+
+  uint64_t WriterOptions::getStripeSize() const {
+    return privateBits->stripeSize;
+  }
+
+  WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) {
+    privateBits->compressionBlockSize = size;
+    return *this;
+  }
+
+  uint64_t WriterOptions::getCompressionBlockSize() const {
+    return privateBits->compressionBlockSize;
+  }
+
+  WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
+    privateBits->rowIndexStride = stride;
+    privateBits->enableIndex = (stride != 0);
+    return *this;
+  }
+
+  uint64_t WriterOptions::getRowIndexStride() const {
+    return privateBits->rowIndexStride;
+  }
+
+  WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
+    privateBits->dictionaryKeySizeThreshold = val;
+    return *this;
+  }
+
+  double WriterOptions::getDictionaryKeySizeThreshold() const {
+    return privateBits->dictionaryKeySizeThreshold;
+  }
+
+  WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) {
+    // Only Hive_0_11 version is supported currently
+    if (version.getMajor() == 0 && version.getMinor() == 11) {
+      privateBits->fileVersion = version;
+      return *this;
+    }
+    throw std::logic_error("Unpoorted file version specified.");
+  }
+
+  FileVersion WriterOptions::getFileVersion() const {
+    return privateBits->fileVersion;
+  }
+
+  WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
+    privateBits->compression = comp;
+    return *this;
+  }
+
+  CompressionKind WriterOptions::getCompression() const {
+    return privateBits->compression;
+  }
+
+  WriterOptions& WriterOptions::setCompressionStrategy(
+    CompressionStrategy strategy) {
+    privateBits->compressionStrategy = strategy;
+    return *this;
+  }
+
+  CompressionStrategy WriterOptions::getCompressionStrategy() const {
+    return privateBits->compressionStrategy;
+  }
+
+  WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
+    privateBits->paddingTolerance = tolerance;
+    return *this;
+  }
+
+  double WriterOptions::getPaddingTolerance() const {
+    return privateBits->paddingTolerance;
+  }
+
+  WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) {
+    privateBits->memoryPool = memoryPool;
+    return *this;
+  }
+
+  MemoryPool* WriterOptions::getMemoryPool() const {
+    return privateBits->memoryPool;
+  }
+
+  WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
+    privateBits->errorStream = &errStream;
+    return *this;
+  }
+
+  std::ostream* WriterOptions::getErrorStream() const {
+    return privateBits->errorStream;
+  }
+
+  bool WriterOptions::getEnableIndex() const {
+    return privateBits->enableIndex;
+  }
+
+  Writer::~Writer() {
+    // PASS
+  }
+
+  class WriterImpl : public Writer {
+  private:
+    std::unique_ptr<ColumnWriter> columnWriter;
+    std::unique_ptr<BufferedOutputStream> compressionStream;
+    std::unique_ptr<BufferedOutputStream> bufferedStream;
+    std::unique_ptr<StreamsFactory> streamsFactory;
+    OutputStream* outStream;
+    WriterOptions options;
+    const Type& type;
+    uint64_t stripeRows, totalRows, indexRows;
+    uint64_t currentOffset;
+    proto::Footer fileFooter;
+    proto::PostScript postScript;
+    proto::StripeInformation stripeInfo;
+    proto::Metadata metadata;
+
+    static const char* magicId;
+    static const uint32_t writerId;
+
+  public:
+    WriterImpl(
+               const Type& type,
+               OutputStream* stream,
+               const WriterOptions& options);
+
+    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
+                                                            const override;
+
+    void add(ColumnVectorBatch& rowsToAdd) override;
+
+    void close() override;
+
+  private:
+    void init();
+    void initStripe();
+    void writeStripe();
+    void writeMetadata();
+    void writeFileFooter();
+    void writePostscript();
+    void buildFooterType(const Type& t, proto::Footer& footer, uint32_t& 
index);
+    static proto::CompressionKind convertCompressionKind(
+                                                  const CompressionKind& kind);
+  };
+
+  const char * WriterImpl::magicId = "ORC";
+
+  // Identification for C++ Orc writer
+  // 0 = ORC Java
+  // 1 = ORC C++
+  // 2 = Presto
+  const uint32_t WriterImpl::writerId = 1;
+
+  WriterImpl::WriterImpl(
+                         const Type& t,
+                         OutputStream* stream,
+                         const WriterOptions& opts) :
+                         outStream(stream),
+                         options(opts),
+                         type(t) {
+    streamsFactory = createStreamsFactory(options, outStream);
+    columnWriter = buildWriter(type, *streamsFactory, options);
+    stripeRows = totalRows = indexRows = 0;
+    currentOffset = 0;
+
+    // compression stream for stripe footer, file footer and metadata
+    compressionStream = createCompressor(
+                                  options.getCompression(),
+                                  outStream,
+                                  options.getCompressionStrategy(),
+                                  1 * 1024 * 1024, // buffer capacity: 1M
+                                  options.getCompressionBlockSize(),
+                                  *options.getMemoryPool());
+
+    // uncompressed stream for post script
+    bufferedStream.reset(new BufferedOutputStream(
+                                            *options.getMemoryPool(),
+                                            outStream,
+                                            1024, // buffer capacity: 1024 
bytes
+                                            
options.getCompressionBlockSize()));
+
+    init();
+  }
+
+  std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size)
+                                                                         const 
{
+    return type.createRowBatch(size, *options.getMemoryPool());
+  }
+
+  void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
+    if (options.getEnableIndex()) {
+      uint64_t pos = 0;
+      uint64_t chunkSize = 0;
+      uint64_t rowIndexStride = options.getRowIndexStride();
+      while (pos < rowsToAdd.numElements) {
+        chunkSize = std::min(rowsToAdd.numElements - pos,
+                             rowIndexStride - indexRows);
+        columnWriter->add(rowsToAdd, pos, chunkSize);
+
+        pos += chunkSize;
+        indexRows += chunkSize;
+        stripeRows += chunkSize;
+
+        if (indexRows >= rowIndexStride) {
+          columnWriter->createRowIndexEntry();
+          indexRows = 0;
+        }
+      }
+    } else {
+      stripeRows += rowsToAdd.numElements;
+      columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
+    }
+
+    if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
+      writeStripe();
+    }
+  }
+
+  void WriterImpl::close() {
+    if (stripeRows > 0) {
+      writeStripe();
+    }
+    writeMetadata();
+    writeFileFooter();
+    writePostscript();
+    outStream->close();
+  }
+
+  void WriterImpl::init() {
+    // Write file header
+    outStream->write(WriterImpl::magicId, strlen(WriterImpl::magicId));
+    currentOffset += strlen(WriterImpl::magicId);
+
+    // Initialize file footer
+    fileFooter.set_headerlength(currentOffset);
+    fileFooter.set_contentlength(0);
+    fileFooter.set_numberofrows(0);
+    fileFooter.set_rowindexstride(
+                          static_cast<uint32_t>(options.getRowIndexStride()));
+    fileFooter.set_writer(writerId);
+
+    uint32_t index = 0;
+    buildFooterType(type, fileFooter, index);
+
+    // Initialize post script
+    postScript.set_footerlength(0);
+    postScript.set_compression(
+                  
WriterImpl::convertCompressionKind(options.getCompression()));
+    postScript.set_compressionblocksize(options.getCompressionBlockSize());
+
+    postScript.add_version(options.getFileVersion().getMajor());
+    postScript.add_version(options.getFileVersion().getMinor());
+
+    postScript.set_writerversion(WriterVersion_ORC_135);
+    postScript.set_magic("ORC");
+
+    // Initialize first stripe
+    initStripe();
+  }
+
+  void WriterImpl::initStripe() {
+    stripeInfo.set_offset(currentOffset);
+    stripeInfo.set_indexlength(0);
+    stripeInfo.set_datalength(0);
+    stripeInfo.set_footerlength(0);
+    stripeInfo.set_numberofrows(0);
+
+    stripeRows = indexRows = 0;
+  }
+
+  void WriterImpl::writeStripe() {
+    if (options.getEnableIndex() && indexRows != 0) {
+      columnWriter->createRowIndexEntry();
+      indexRows = 0;
+    } else {
+      columnWriter->mergeRowGroupStatsIntoStripeStats();
+    }
+
+    std::vector<proto::Stream> streams;
+    // write ROW_INDEX streams
+    if (options.getEnableIndex()) {
+      columnWriter->writeIndex(streams);
+    }
+    // write streams like PRESENT, DATA, etc.
+    columnWriter->flush(streams);
+
+    // only until all streams are flushed can we reset positions
+    if (options.getEnableIndex()) {
+      columnWriter->resetIndex();
+    }
+
+    // generate and write stripe footer
+    proto::StripeFooter stripeFooter;
+    for (uint32_t i = 0; i < streams.size(); ++i) {
+      *stripeFooter.add_streams() = streams[i];
+    }
+
+    std::vector<proto::ColumnEncoding> encodings;
+    columnWriter->getColumnEncoding(encodings);
+
+    for (uint32_t i = 0; i < encodings.size(); ++i) {
+      *stripeFooter.add_columns() = encodings[i];
+    }
+
+    // TODO: ORC-205 Include writer timezone in stripe footer
+
+    // add stripe statistics to metadata
+    proto::StripeStatistics* stripeStats = metadata.add_stripestats();
+    std::vector<proto::ColumnStatistics> colStats;
+    columnWriter->getStripeStatistics(colStats);
+    for (uint32_t i = 0; i != colStats.size(); ++i) {
+      *stripeStats->add_colstats() = colStats[i];
+    }
+    // merge stripe stats into file stats and clear stripe stats
+    columnWriter->mergeStripeStatsIntoFileStats();
+
+    if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) {
+      throw std::logic_error("Failed to write stripe footer.");
+    }
+    uint64_t footerLength = compressionStream->flush();
+
+    // calculate data length and index length
+    uint64_t dataLength = 0;
+    uint64_t indexLength = 0;
+    for (uint32_t i = 0; i < streams.size(); ++i) {
+      if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX) {
+        indexLength += streams[i].length();
+      } else {
+        dataLength += streams[i].length();
+      }
+    }
+
+    // update stripe info
+    stripeInfo.set_indexlength(indexLength);
+    stripeInfo.set_datalength(dataLength);
+    stripeInfo.set_footerlength(footerLength);
+    stripeInfo.set_numberofrows(stripeRows);
+
+    *fileFooter.add_stripes() = stripeInfo;
+
+    currentOffset = currentOffset + indexLength + dataLength + footerLength;
+    totalRows += stripeRows;
+
+    initStripe();
+  }
+
+  void WriterImpl::writeMetadata() {
+    if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) {
+      throw std::logic_error("Failed to write metadata.");
+    }
+    postScript.set_metadatalength(compressionStream.get()->flush());
+  }
+
+  void WriterImpl::writeFileFooter() {
+    fileFooter.set_contentlength(currentOffset - fileFooter.headerlength());
+    fileFooter.set_numberofrows(totalRows);
+
+    // update file statistics
+    std::vector<proto::ColumnStatistics> colStats;
+    columnWriter->getFileStatistics(colStats);
+    for (uint32_t i = 0; i != colStats.size(); ++i) {
+      *fileFooter.add_statistics() = colStats[i];
+    }
+
+    if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) {
+      throw std::logic_error("Failed to write file footer.");
+    }
+    postScript.set_footerlength(compressionStream->flush());
+  }
+
+  void WriterImpl::writePostscript() {
+    if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) {
+      throw std::logic_error("Failed to write post script.");
+    }
+    unsigned char psLength =
+                      static_cast<unsigned char>(bufferedStream->flush());
+    outStream->write(&psLength, sizeof(unsigned char));
+  }
+
+  void WriterImpl::buildFooterType(
+                                   const Type& t,
+                                   proto::Footer& footer,
+                                   uint32_t & index) {
+    proto::Type protoType;
+    protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength()));
+    protoType.set_precision(static_cast<uint32_t>(t.getPrecision()));
+    protoType.set_scale(static_cast<uint32_t>(t.getScale()));
+
+    switch (t.getKind()) {
+    case BOOLEAN: {
+      protoType.set_kind(proto::Type_Kind_BOOLEAN);
+      break;
+    }
+    case BYTE: {
+      protoType.set_kind(proto::Type_Kind_BYTE);
+      break;
+    }
+    case SHORT: {
+      protoType.set_kind(proto::Type_Kind_SHORT);
+      break;
+    }
+    case INT: {
+      protoType.set_kind(proto::Type_Kind_INT);
+      break;
+    }
+    case LONG: {
+      protoType.set_kind(proto::Type_Kind_LONG);
+      break;
+    }
+    case FLOAT: {
+      protoType.set_kind(proto::Type_Kind_FLOAT);
+      break;
+    }
+    case DOUBLE: {
+      protoType.set_kind(proto::Type_Kind_DOUBLE);
+      break;
+    }
+    case STRING: {
+      protoType.set_kind(proto::Type_Kind_STRING);
+      break;
+    }
+    case BINARY: {
+      protoType.set_kind(proto::Type_Kind_BINARY);
+      break;
+    }
+    case TIMESTAMP: {
+      protoType.set_kind(proto::Type_Kind_TIMESTAMP);
+      break;
+    }
+    case LIST: {
+      protoType.set_kind(proto::Type_Kind_LIST);
+      break;
+    }
+    case MAP: {
+      protoType.set_kind(proto::Type_Kind_MAP);
+      break;
+    }
+    case STRUCT: {
+      protoType.set_kind(proto::Type_Kind_STRUCT);
+      break;
+    }
+    case UNION: {
+      protoType.set_kind(proto::Type_Kind_UNION);
+      break;
+    }
+    case DECIMAL: {
+      protoType.set_kind(proto::Type_Kind_DECIMAL);
+      break;
+    }
+    case DATE: {
+      protoType.set_kind(proto::Type_Kind_DATE);
+      break;
+    }
+    case VARCHAR: {
+      protoType.set_kind(proto::Type_Kind_VARCHAR);
+      break;
+    }
+    case CHAR: {
+      protoType.set_kind(proto::Type_Kind_CHAR);
+      break;
+    }
+    default:
+      throw std::logic_error("Unknown type.");
+    }
+
+    int pos = static_cast<int>(index);
+    *footer.add_types() = protoType;
+
+    for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) {
+      if (t.getKind() != LIST && t.getKind() != MAP && t.getKind() != UNION) {
+        footer.mutable_types(pos)->add_fieldnames(t.getFieldName(i));
+      }
+      footer.mutable_types(pos)->add_subtypes(++index);
+      buildFooterType(*t.getSubtype(i), footer, index);
+    }
+  }
+
+  proto::CompressionKind WriterImpl::convertCompressionKind(
+                                      const CompressionKind& kind) {
+    return static_cast<proto::CompressionKind>(kind);
+  }
+
+  std::unique_ptr<Writer> createWriter(
+                                       const Type& type,
+                                       OutputStream* stream,
+                                      const WriterOptions& options) {
+    return std::unique_ptr<Writer>(
+                                   new WriterImpl(
+                                            type,
+                                            stream,
+                                            options));
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/io/OutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index fa2de30..5fc8187 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -23,6 +23,10 @@
 
 namespace orc {
 
+  PositionRecorder::~PositionRecorder() {
+    // PASS
+  }
+
   BufferedOutputStream::BufferedOutputStream(
                                     MemoryPool& pool,
                                     OutputStream * outStream,

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 4691895..ea7ac6d 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -43,6 +43,7 @@ add_executable (orc-test
   TestTimestampStatistics.cc
   TestTimezone.cc
   TestType.cc
+  TestWriter.cc
 )
 
 target_link_libraries (orc-test

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/test/TestColumnStatistics.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestColumnStatistics.cc b/c++/test/TestColumnStatistics.cc
index 32f8e27..24ad394 100644
--- a/c++/test/TestColumnStatistics.cc
+++ b/c++/test/TestColumnStatistics.cc
@@ -173,26 +173,9 @@ namespace orc {
     EXPECT_TRUE(std::abs(971341.3203 - dblStats->getSum()) < 0.00001);
   }
 
-  TEST(ColumnStatistics, stringColumnStatisticsWithNoStringComparison) {
+  TEST(ColumnStatistics, stringColumnStatistics) {
     std::unique_ptr<StringColumnStatisticsImpl> strStats(
-      new StringColumnStatisticsImpl(false));
-
-    EXPECT_FALSE(strStats->hasMinimum());
-    EXPECT_FALSE(strStats->hasMaximum());
-    EXPECT_EQ(0, strStats->getNumberOfValues());
-    EXPECT_TRUE(strStats->hasTotalLength());
-    EXPECT_EQ(0, strStats->getTotalLength());
-
-    strStats->update("string");
-    EXPECT_FALSE(strStats->hasMinimum());
-    EXPECT_FALSE(strStats->hasMaximum());
-    EXPECT_TRUE(strStats->hasTotalLength());
-    EXPECT_EQ(6, strStats->getTotalLength());
-  }
-
-  TEST(ColumnStatistics, stringColumnStatisticsWithStringComparison) {
-    std::unique_ptr<StringColumnStatisticsImpl> strStats(
-      new StringColumnStatisticsImpl(true));
+      new StringColumnStatisticsImpl());
 
     EXPECT_FALSE(strStats->hasMinimum());
     EXPECT_FALSE(strStats->hasMaximum());

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/test/TestWriter.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
new file mode 100644
index 0000000..79a1d2d
--- /dev/null
+++ b/c++/test/TestWriter.cc
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/ColumnPrinter.hh"
+#include "orc/OrcFile.hh"
+
+#include "MemoryInputStream.hh"
+#include "MemoryOutputStream.hh"
+
+#include "wrap/gmock.h"
+#include "wrap/gtest-wrapper.h"
+
+#include <cmath>
+#include <ctime>
+#include <sstream>
+
+namespace orc {
+
+  const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
+
+  std::unique_ptr<Writer> createWriter(
+                                      uint64_t stripeSize,
+                                      uint64_t compresionblockSize,
+                                      CompressionKind compression,
+                                      const Type& type,
+                                      MemoryPool* memoryPool,
+                                      OutputStream* stream,
+                                      FileVersion version = FileVersion(0, 
11)){
+    WriterOptions options;
+    options.setStripeSize(stripeSize);
+    options.setCompressionBlockSize(compresionblockSize);
+    options.setCompression(compression);
+    options.setMemoryPool(memoryPool);
+    options.setRowIndexStride(0);
+    options.setFileVersion(version);
+    return createWriter(type, stream, options);
+  }
+
+  std::unique_ptr<Reader> createReader(
+                                      MemoryPool * memoryPool,
+                                      std::unique_ptr<InputStream> stream) {
+    ReaderOptions options;
+    options.setMemoryPool(*memoryPool);
+    return createReader(std::move(stream), options);
+  }
+
+  std::unique_ptr<RowReader> createRowReader(Reader* reader) {
+    RowReaderOptions rowReaderOpts;
+    return reader->createRowReader(rowReaderOpts);
+  }
+
+  TEST(Writer, writeEmptyFile) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    ORC_UNIQUE_PTR<Type> type(Type::buildTypeFromString("struct<col1:int>"));
+
+    uint64_t stripeSize = 16 * 1024; // 16K
+    uint64_t compressionBlockSize = 1024; // 1k
+
+    std::unique_ptr<Writer> writer = createWriter(
+                                      stripeSize,
+                                      compressionBlockSize,
+                                      CompressionKind_ZLIB,
+                                      *type,
+                                      pool,
+                                      &memStream);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+            new MemoryInputStream (memStream.getData(), 
memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(
+                                                pool,
+                                                std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(FileVersion(0, 11), reader->getFormatVersion());
+    EXPECT_EQ("0.11", reader->getFormatVersion().toString());
+    EXPECT_EQ(WriterVersion_ORC_135, reader->getWriterVersion());
+    EXPECT_EQ(0, reader->getNumberOfRows());
+
+    std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024);
+    EXPECT_FALSE(rowReader->next(*batch));
+  }
+
+  TEST(Writer, writeIntFileOneStripe) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    ORC_UNIQUE_PTR<Type> type(Type::buildTypeFromString("struct<col1:int>"));
+
+    uint64_t stripeSize = 16 * 1024; // 16K
+    uint64_t compressionBlockSize = 1024; // 1k
+
+    std::unique_ptr<Writer> writer = createWriter(
+                                      stripeSize,
+                                      compressionBlockSize,
+                                      CompressionKind_ZLIB,
+                                      *type,
+                                      pool,
+                                      &memStream,
+                                      FileVersion(0, 11));
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(1024);
+    StructVectorBatch* structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    LongVectorBatch* longBatch =
+      dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+
+    for (uint64_t i = 0; i < 1024; ++i) {
+      longBatch->data[i] = static_cast<int64_t>(i);
+    }
+    structBatch->numElements = 1024;
+    longBatch->numElements = 1024;
+
+    writer->add(*batch);
+
+    for (uint64_t i = 1024; i < 2000; ++i) {
+      longBatch->data[i - 1024] = static_cast<int64_t>(i);
+    }
+    structBatch->numElements = 2000 - 1024;
+    longBatch->numElements = 2000 - 1024;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+            new MemoryInputStream (memStream.getData(), 
memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(
+                                                pool,
+                                                std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(2000, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(2048);
+    EXPECT_TRUE(rowReader->next(*batch));
+    EXPECT_EQ(2000, batch->numElements);
+    EXPECT_FALSE(rowReader->next(*batch));
+
+    for (uint64_t i = 0; i < 2000; ++i) {
+      structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+      longBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]);
+      EXPECT_EQ(i, longBatch->data[i]);
+    }
+  }
+
+  TEST(Writer, writeIntFileMultipleStripes) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    ORC_UNIQUE_PTR<Type> type(Type::buildTypeFromString("struct<col1:int>"));
+
+    uint64_t stripeSize = 1024; // 1K
+    uint64_t compressionBlockSize = 1024; // 1k
+
+    std::unique_ptr<Writer> writer = createWriter(
+                                      stripeSize,
+                                      compressionBlockSize,
+                                      CompressionKind_ZLIB,
+                                      *type,
+                                      pool,
+                                      &memStream);
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
+    StructVectorBatch* structBatch =
+      dynamic_cast<StructVectorBatch*>(batch.get());
+    LongVectorBatch* longBatch =
+      dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+
+    for (uint64_t j = 0; j < 10; ++j) {
+      for (uint64_t i = 0; i < 65535; ++i) {
+        longBatch->data[i] = static_cast<int64_t>(i);
+      }
+      structBatch->numElements = 65535;
+      longBatch->numElements = 65535;
+
+      writer->add(*batch);
+    }
+
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+            new MemoryInputStream (memStream.getData(), 
memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(
+                                                pool,
+                                                std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(655350, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(65535);
+    for (uint64_t j = 0; j < 10; ++j) {
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(65535, batch->numElements);
+
+      for (uint64_t i = 0; i < 65535; ++i) {
+        structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+        longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+        EXPECT_EQ(i, longBatch->data[i]);
+      }
+    }
+    EXPECT_FALSE(rowReader->next(*batch));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/tools/src/FileMetadata.cc
----------------------------------------------------------------------
diff --git a/tools/src/FileMetadata.cc b/tools/src/FileMetadata.cc
index 68ca208..58fe8a2 100644
--- a/tools/src/FileMetadata.cc
+++ b/tools/src/FileMetadata.cc
@@ -102,7 +102,7 @@ void printMetadata(std::ostream & out, const char*filename, 
bool verbose) {
   out << "  \"rows\": " << reader->getNumberOfRows() << ",\n";
   uint64_t stripeCount = reader->getNumberOfStripes();
   out << "  \"stripe count\": " << stripeCount << ",\n";
-  out << "  \"format\": \"" << reader->getFormatVersion()
+  out << "  \"format\": \"" << reader->getFormatVersion().toString()
       << "\", \"writer version\": \""
             << orc::writerVersionToString(reader->getWriterVersion())
             << "\",\n";

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/tools/test/TestMatch.cc
----------------------------------------------------------------------
diff --git a/tools/test/TestMatch.cc b/tools/test/TestMatch.cc
index 5bf7a40..fc4f6f9 100644
--- a/tools/test/TestMatch.cc
+++ b/tools/test/TestMatch.cc
@@ -110,7 +110,7 @@ namespace orc {
     EXPECT_EQ(GetParam().rowCount, reader->getNumberOfRows());
     EXPECT_EQ(GetParam().rowIndexStride, reader->getRowIndexStride());
     EXPECT_EQ(GetParam().contentLength, reader->getContentLength());
-    EXPECT_EQ(GetParam().formatVersion, reader->getFormatVersion());
+    EXPECT_EQ(GetParam().formatVersion, reader->getFormatVersion().toString());
     EXPECT_EQ(getFilename(), reader->getStreamName());
     EXPECT_EQ(GetParam().userMeta.size(), reader->getMetadataKeys().size());
     for(std::map<std::string, std::string>::const_iterator itr =
@@ -926,7 +926,7 @@ TEST(TestMatch, futureFormatVersion) {
   EXPECT_EQ(("Warning: ORC file " + filename +
              " was written in an unknown format version 19.99\n"),
             errorMsg.str());
-  EXPECT_EQ("19.99", reader->getFormatVersion());
+  EXPECT_EQ("19.99", reader->getFormatVersion().toString());
 }
 
 TEST(TestMatch, selectColumns) {

Reply via email to