[ 
https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041373#comment-16041373
 ] 

ASF GitHub Bot commented on ORC-178:
------------------------------------

Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/128#discussion_r120706989
  
    --- Diff: c++/src/Writer.cc ---
    @@ -0,0 +1,659 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "orc/Common.hh"
    +#include "orc/OrcFile.hh"
    +
    +#include "ColumnWriter.hh"
    +#include "Timezone.hh"
    +
    +#include <memory>
    +
    +namespace orc {
    +
    +  struct WriterOptionsPrivate {
    +    uint64_t stripeSize;
    +    uint64_t blockSize;
    +    uint64_t rowIndexStride;
    +    uint64_t bufferSize;
    +    bool blockPadding;
    +    CompressionKind compression;
    +    EncodingStrategy encodingStrategy;
    +    CompressionStrategy compressionStrategy;
    +    MemoryPool *memoryPool;
    +    WriterVersion version;
    +    double paddingTolerance;
    +    std::ostream* errorStream;
    +    RleVersion rleVersion;
    +    double dictionaryKeySizeThreshold;
    +    bool enableStats;
    +    bool enableStrStatsCmp;
    +    bool enableIndex;
    +    const Timezone* timezone;
    +
    +    WriterOptionsPrivate() {
    +      stripeSize = 64 * 1024 * 1024; // 64M
    +      blockSize = 256 * 1024; // 256K
    +      rowIndexStride = 10000;
    +      bufferSize = 4 * 1024 * 1024; // 4M
    +      blockPadding = false;
    +      compression = CompressionKind_ZLIB;
    +      encodingStrategy = EncodingStrategy_SPEED;
    +      compressionStrategy = CompressionStrategy_SPEED;
    +      memoryPool = getDefaultPool();
    +      version = WriterVersion_ORC_135;
    +      paddingTolerance = 0.0;
    +      errorStream = &std::cerr;
    +      rleVersion = RleVersion_1;
    +      dictionaryKeySizeThreshold = 0.0;
    +      enableStats = true;
    +      enableStrStatsCmp = false;
    +      enableIndex = true;
    +      timezone = &getLocalTimezone();
    +    }
    +  };
    +
    +  WriterOptions::WriterOptions():
    +    privateBits(std::unique_ptr<WriterOptionsPrivate>
    +                (new WriterOptionsPrivate())) {
    +    // PASS
    +  }
    +
    +  WriterOptions::WriterOptions(const WriterOptions& rhs):
    +    privateBits(std::unique_ptr<WriterOptionsPrivate>
    +                (new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
    +    // PASS
    +  }
    +
    +  WriterOptions::WriterOptions(WriterOptions& rhs) {
    +    // swap privateBits with rhs
    +    WriterOptionsPrivate* l = privateBits.release();
    +    privateBits.reset(rhs.privateBits.release());
    +    rhs.privateBits.reset(l);
    +  }
    +
    +  WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
    +    if (this != &rhs) {
    +      privateBits.reset(new 
WriterOptionsPrivate(*(rhs.privateBits.get())));
    +    }
    +    return *this;
    +  }
    +
    +  WriterOptions::~WriterOptions() {
    +    // PASS
    +  }
    +
    +  WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
    +    privateBits->stripeSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getStripeSize() const {
    +    return privateBits->stripeSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setBlockSize(uint64_t size) {
    +    privateBits->blockSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getBlockSize() const {
    +    return privateBits->blockSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
    +    privateBits->rowIndexStride = stride;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getRowIndexStride() const {
    +    return privateBits->rowIndexStride;
    +  }
    +
    +  WriterOptions& WriterOptions::setBufferSize(uint64_t size) {
    +    privateBits->bufferSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getBufferSize() const {
    +    return privateBits->bufferSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
    +    privateBits->dictionaryKeySizeThreshold = val;
    +    return *this;
    +  }
    +
    +  double WriterOptions::getDictionaryKeySizeThreshold() const {
    +    return privateBits->dictionaryKeySizeThreshold;
    +  }
    +
    +  WriterOptions& WriterOptions::setBlockPadding(bool padding) {
    +    privateBits->blockPadding = padding;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getBlockPadding() const {
    +    return privateBits->blockPadding;
    +  }
    +
    +  WriterOptions& WriterOptions::setRleVersion(RleVersion version) {
    +    privateBits->rleVersion = version;
    +    return *this;
    +  }
    +
    +  RleVersion WriterOptions::getRleVersion() const {
    +    return privateBits->rleVersion;
    +  }
    +
    +  WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
    +    privateBits->compression = comp;
    +    return *this;
    +  }
    +
    +  CompressionKind WriterOptions::getCompression() const {
    +    return privateBits->compression;
    +  }
    +
    +  WriterOptions& WriterOptions::setEncodingStrategy(EncodingStrategy 
strategy) {
    +    privateBits->encodingStrategy = strategy;
    +    return *this;
    +  }
    +
    +  EncodingStrategy WriterOptions::getEncodingStrategy() const {
    +    return privateBits->encodingStrategy;
    +  }
    +
    +  WriterOptions& WriterOptions::setCompressionStrategy(
    +    CompressionStrategy strategy) {
    +    privateBits->compressionStrategy = strategy;
    +    return *this;
    +  }
    +
    +  CompressionStrategy WriterOptions::getCompressionStrategy() const {
    +    return privateBits->compressionStrategy;
    +  }
    +
    +  WriterOptions& WriterOptions::setWriterVersion(WriterVersion version) {
    +    privateBits->version = version;
    +    return *this;
    +  }
    +
    +  WriterVersion WriterOptions::getWriterVersion() const {
    +    return privateBits->version;
    +  }
    +
    +  WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
    +    privateBits->paddingTolerance = tolerance;
    +    return *this;
    +  }
    +
    +  double WriterOptions::getPaddingTolerance() const {
    +    return privateBits->paddingTolerance;
    +  }
    +
    +  WriterOptions& WriterOptions::setMemoryPool(MemoryPool * memoryPool) {
    +    privateBits->memoryPool = memoryPool;
    +    return *this;
    +  }
    +
    +  MemoryPool * WriterOptions::getMemoryPool() const {
    +    return privateBits->memoryPool;
    +  }
    +
    +  WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
    +    privateBits->errorStream = &errStream;
    +    return *this;
    +  }
    +
    +  std::ostream * WriterOptions::getErrorStream() const {
    +    return privateBits->errorStream;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableStats(bool enable) {
    +    privateBits->enableStats = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableStats() const {
    +    return privateBits->enableStats;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableStrStatsCmp(bool enable) {
    +    privateBits->enableStrStatsCmp = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableStrStatsCmp() const {
    +    return privateBits->enableStrStatsCmp;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableIndex(bool enable) {
    +    privateBits->enableIndex = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableIndex() const {
    +    return privateBits->enableIndex;
    +  }
    +
    +  const Timezone* WriterOptions::getTimezone() const {
    +    return privateBits->timezone;
    +  }
    +
    +  WriterOptions&  WriterOptions::setTimezone(const std::string& zone) {
    +    privateBits->timezone = &getTimezoneByName(zone);
    +    return *this;
    +  }
    +
    +  Writer::~Writer() {
    +    // PASS
    +  }
    +
    +  class WriterImpl : public Writer {
    +  private:
    +    std::unique_ptr<ColumnWriter> columnWriter;
    +    std::unique_ptr<BufferedOutputStream> compressionStream;
    +    std::unique_ptr<BufferedOutputStream> bufferedStream;
    +    std::unique_ptr<StreamsFactory> streamsFactory;
    +    OutputStream * outStream;
    +    WriterOptions options;
    +    const Type& type;
    +    uint64_t stripeRows, totalRows, indexRows;
    +    uint64_t currentOffset;
    +    proto::Footer fileFooter;
    +    proto::PostScript postScript;
    +    proto::StripeInformation stripeInfo;
    +    proto::Metadata metadata;
    +
    +    static const char * magicId;
    +
    +  public:
    +    WriterImpl(
    +               const Type& type,
    +               OutputStream* stream,
    +               const WriterOptions& options);
    +
    +    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
    +                                                            const override;
    +
    +    void add(ColumnVectorBatch& rowsToAdd) override;
    +
    +    void close() override;
    +
    +  private:
    +    void Init();
    +    void InitStripe();
    +    void WriteStripe();
    +    void WriteMetadata();
    +    void WriteFileFooter();
    +    void WritePostscript();
    +    void BuildFooterType(const Type& t, proto::Footer& footer, uint32_t& 
index);
    +    static proto::CompressionKind convertCompressionKind(
    +                                                  const CompressionKind& 
kind);
    +  };
    +
    +  const char * WriterImpl::magicId = "ORC";
    +
    +  WriterImpl::WriterImpl(
    +                         const Type& t,
    +                         OutputStream * stream,
    +                         const WriterOptions& opts) :
    +                         outStream(stream),
    +                         options(opts),
    +                         type(t) {
    +    streamsFactory = createStreamsFactory(options, outStream);
    +    columnWriter = buildWriter(type, *streamsFactory, options);
    +    stripeRows = totalRows = indexRows = 0;
    +    currentOffset = 0;
    +
    +    uint64_t bufferCapacity = 4 * 1024 * 1024; // 4M
    +    compressionStream = createCompressor(
    +                                  options.getCompression(),
    +                                  outStream,
    +                                  options.getCompressionStrategy(),
    +                                  bufferCapacity,
    +                                  options.getBlockSize(),
    +                                  *options.getMemoryPool());
    +    bufferedStream.reset(new BufferedOutputStream(
    +                                            *options.getMemoryPool(),
    +                                            outStream,
    +                                            bufferCapacity,
    +                                            options.getBlockSize()));
    +
    +    Init();
    +  }
    +
    +  std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t 
size)
    +                                                                         
const {
    +    return type.createRowBatch(size, *options.getMemoryPool());
    +  }
    +
    +  void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
    +    if (options.getEnableIndex()) {
    +      uint64_t pos = 0;
    +      uint64_t chunkSize = 0;
    +      uint64_t rowIndexStride = options.getRowIndexStride();
    +      while (pos < rowsToAdd.numElements) {
    +        chunkSize = std::min(rowsToAdd.numElements - pos,
    +                             rowIndexStride - indexRows);
    +        columnWriter->add(rowsToAdd, pos, chunkSize);
    +
    +        pos += chunkSize;
    +        indexRows += chunkSize;
    +        stripeRows += chunkSize;
    +
    +        if (indexRows >= rowIndexStride) {
    +          columnWriter->createRowIndexEntry();
    +          indexRows = 0;
    +        }
    +      }
    +    } else {
    +      stripeRows += rowsToAdd.numElements;
    +      columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
    +    }
    +
    +    if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
    +      WriteStripe();
    +    }
    --- End diff --
    
    We won't throw exception here. If estimated size is smaller than stripe 
size, it just means the current data added are not big enough to form a stripe. 
We continue keeping data in memory until a subsequent add() that satisfies this 
check, or a call to Writer::close() which forces us to flush any remaining data 
as the last stripe of file.


> Implement Basic C++ Writer and Writer Option
> --------------------------------------------
>
>                 Key: ORC-178
>                 URL: https://issues.apache.org/jira/browse/ORC-178
>             Project: ORC
>          Issue Type: Sub-task
>          Components: C++
>            Reporter: Gang Wu
>            Assignee: Xiening Dai
>
> 1. write orc file header, file footer, postscript, etc.
> 2. write columns of all types 
> 3. write column statistics
> 4. write index stream in writer and reader seeks to row based on index 
> information 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to