[ 
https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117103#comment-16117103
 ] 

ASF GitHub Bot commented on ORC-178:
------------------------------------

Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/128#discussion_r131738738
  
    --- Diff: c++/src/ColumnWriter.cc ---
    @@ -0,0 +1,507 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "orc/Int128.hh"
    +#include "orc/Writer.hh"
    +
    +#include "ByteRLE.hh"
    +#include "ColumnWriter.hh"
    +#include "RLE.hh"
    +#include "Statistics.hh"
    +#include "Timezone.hh"
    +
    +namespace orc {
    +  StreamsFactory::~StreamsFactory() {
    +    //PASS
    +  }
    +
    +  class StreamsFactoryImpl : public StreamsFactory {
    +  public:
    +    StreamsFactoryImpl(
    +                       const WriterOptions& writerOptions,
    +                       OutputStream * outputStream) :
    +                       options(writerOptions),
    +                       outStream(outputStream) {
    +                       }
    +
    +    virtual std::unique_ptr<BufferedOutputStream>
    +                    createStream(proto::Stream_Kind kind) override;
    +  private:
    +    const WriterOptions& options;
    +    OutputStream * outStream;
    +  };
    +
    +  std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
    +                                                      proto::Stream_Kind) {
    +    // In the future, we can decide compression strategy and modifier
    +    // based on stream kind. But for now we just use the setting from
    +    // WriterOption
    +    return createCompressor(
    +                            options.getCompression(),
    +                            outStream,
    +                            options.getCompressionStrategy(),
    +                            options.getBufferSize(),
    +                            options.getBlockSize(),
    +                            *options.getMemoryPool());
    +  }
    +
    +  std::unique_ptr<StreamsFactory> createStreamsFactory(
    +                                        const WriterOptions& options,
    +                                        OutputStream * outStream) {
    +    return std::unique_ptr<StreamsFactory>(
    +                                   new StreamsFactoryImpl(options, 
outStream));
    +  }
    +
    +  RowIndexPositionRecorder::~RowIndexPositionRecorder() {
    +    // PASS
    +  }
    +
    +  ColumnWriter::ColumnWriter(
    +                             const Type& type,
    +                             StreamsFactory& factory,
    +                             const WriterOptions& options) :
    +                                columnId(type.getColumnId()),
    +                                streamsFactory(factory),
    +                                colIndexStatistics(),
    +                                colStripeStatistics(),
    +                                colFileStatistics(),
    +                                enableIndex(options.getEnableIndex()),
    +                                enableStats(options.getEnableStats()),
    +                                rowIndex(),
    +                                rowIndexEntry(),
    +                                rowIndexPosition(),
    +                                memPool(*options.getMemoryPool()),
    +                                indexStream() {
    +
    +    std::unique_ptr<BufferedOutputStream> presentStream =
    +        factory.createStream(proto::Stream_Kind_PRESENT);
    +    notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
    +
    +    if (enableIndex || enableStats) {
    +      bool enableStrCmp = options.getEnableStrStatsCmp();
    +      colIndexStatistics = createColumnStatistics(type, enableStrCmp);
    +      if (enableStats) {
    +        colStripeStatistics = createColumnStatistics(type, enableStrCmp);
    +        colFileStatistics = createColumnStatistics(type, enableStrCmp);
    +      }
    +    }
    +
    +    if (enableIndex) {
    +      rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
    +      rowIndexEntry =
    +        std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
    +      rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
    +                     new RowIndexPositionRecorder(*rowIndexEntry));
    +      indexStream =
    +        factory.createStream(proto::Stream_Kind_ROW_INDEX);
    +    }
    +  }
    +
    +  ColumnWriter::~ColumnWriter() {
    +    // PASS
    +  }
    +
    +  void ColumnWriter::add(ColumnVectorBatch& batch,
    +                         uint64_t offset,
    +                         uint64_t numValues) {
    +    notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
    +  }
    +
    +  void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_PRESENT);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(notNullEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t ColumnWriter::getEstimatedSize() const {
    +    return notNullEncoder->getBufferSize();
    +  }
    +
    +  void ColumnWriter::getStripeStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    getProtoBufStatistics(stats, colStripeStatistics.get());
    +  }
    +
    +  void ColumnWriter::mergeStripeStatsIntoFileStats() {
    +    colFileStatistics->merge(*colStripeStatistics);
    +    colStripeStatistics->reset();
    +  }
    +
    +  void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
    +    colStripeStatistics->merge(*colIndexStatistics);
    +    colIndexStatistics->reset();
    +  }
    +
    +  void ColumnWriter::getFileStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    getProtoBufStatistics(stats, colFileStatistics.get());
    +  }
    +
    +  void ColumnWriter::createRowIndexEntry() {
    +    proto::ColumnStatistics *indexStats = 
rowIndexEntry->mutable_statistics();
    +    colIndexStatistics->toProtoBuf(*indexStats);
    +
    +    *rowIndex->add_entry() = *rowIndexEntry;
    +
    +    rowIndexEntry->clear_positions();
    +    rowIndexEntry->clear_statistics();
    +
    +    if (enableStats) {
    +      colStripeStatistics->merge(*colIndexStatistics);
    +    }
    +    colIndexStatistics->reset();
    +
    +    recordPosition();
    +  }
    +
    +  void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const 
{
    +    // write row index to output stream
    +    rowIndex->SerializeToZeroCopyStream(indexStream.get());
    +
    +    // construct row index stream
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_ROW_INDEX);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(indexStream->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  void ColumnWriter::recordPosition() const {
    +    notNullEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  void ColumnWriter::resetIndex() {
    +    // clear row index
    +    rowIndex->clear_entry();
    +    rowIndexEntry->clear_positions();
    +    rowIndexEntry->clear_statistics();
    +
    +    // write current positions
    +    recordPosition();
    +  }
    +
    +  class StructColumnWriter : public ColumnWriter {
    +  public:
    +    StructColumnWriter(
    +                       const Type& type,
    +                       StreamsFactory& factory,
    +                       const WriterOptions& options);
    +    ~StructColumnWriter();
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +    virtual void getColumnEncoding(
    +      std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void getStripeStatistics(
    +      std::vector<proto::ColumnStatistics>& stats) const override;
    +
    +    virtual void getFileStatistics(
    +      std::vector<proto::ColumnStatistics>& stats) const override;
    +
    +    virtual void mergeStripeStatsIntoFileStats() override;
    +
    +    virtual void mergeRowGroupStatsIntoStripeStats() override;
    +
    +    virtual void createRowIndexEntry() override;
    +
    +    virtual void writeIndex(
    +      std::vector<proto::Stream> &streams) const override;
    +
    +    virtual void resetIndex() override;
    +
    +  private:
    +    std::vector<ColumnWriter *> children;
    --- End diff --
    
    Ok.


> Implement Basic C++ Writer and Writer Option
> --------------------------------------------
>
>                 Key: ORC-178
>                 URL: https://issues.apache.org/jira/browse/ORC-178
>             Project: ORC
>          Issue Type: Sub-task
>          Components: C++
>            Reporter: Gang Wu
>            Assignee: Xiening Dai
>
> 1. write orc file header, file footer, postscript, etc.
> 2. write columns of all types 
> 3. write column statistics
> 4. write index stream in writer and reader seeks to row based on index 
> information 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to