[ https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117103#comment-16117103 ]
ASF GitHub Bot commented on ORC-178: ------------------------------------ Github user omalley commented on a diff in the pull request: https://github.com/apache/orc/pull/128#discussion_r131738738 --- Diff: c++/src/ColumnWriter.cc --- @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc/Int128.hh" +#include "orc/Writer.hh" + +#include "ByteRLE.hh" +#include "ColumnWriter.hh" +#include "RLE.hh" +#include "Statistics.hh" +#include "Timezone.hh" + +namespace orc { + StreamsFactory::~StreamsFactory() { + //PASS + } + + class StreamsFactoryImpl : public StreamsFactory { + public: + StreamsFactoryImpl( + const WriterOptions& writerOptions, + OutputStream * outputStream) : + options(writerOptions), + outStream(outputStream) { + } + + virtual std::unique_ptr<BufferedOutputStream> + createStream(proto::Stream_Kind kind) override; + private: + const WriterOptions& options; + OutputStream * outStream; + }; + + std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream( + proto::Stream_Kind) { + // In the future, we can decide compression strategy and modifier + // based on stream kind. But for now we just use the setting from + // WriterOption + return createCompressor( + options.getCompression(), + outStream, + options.getCompressionStrategy(), + options.getBufferSize(), + options.getBlockSize(), + *options.getMemoryPool()); + } + + std::unique_ptr<StreamsFactory> createStreamsFactory( + const WriterOptions& options, + OutputStream * outStream) { + return std::unique_ptr<StreamsFactory>( + new StreamsFactoryImpl(options, outStream)); + } + + RowIndexPositionRecorder::~RowIndexPositionRecorder() { + // PASS + } + + ColumnWriter::ColumnWriter( + const Type& type, + StreamsFactory& factory, + const WriterOptions& options) : + columnId(type.getColumnId()), + streamsFactory(factory), + colIndexStatistics(), + colStripeStatistics(), + colFileStatistics(), + enableIndex(options.getEnableIndex()), + enableStats(options.getEnableStats()), + rowIndex(), + rowIndexEntry(), + rowIndexPosition(), + memPool(*options.getMemoryPool()), + indexStream() { + + std::unique_ptr<BufferedOutputStream> presentStream = + factory.createStream(proto::Stream_Kind_PRESENT); + notNullEncoder = createBooleanRleEncoder(std::move(presentStream)); + + if (enableIndex || enableStats) { + bool enableStrCmp = options.getEnableStrStatsCmp(); + colIndexStatistics = createColumnStatistics(type, enableStrCmp); + if (enableStats) { + colStripeStatistics = createColumnStatistics(type, enableStrCmp); + colFileStatistics = createColumnStatistics(type, enableStrCmp); + } + } + + if (enableIndex) { + rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex()); + rowIndexEntry = + std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry()); + rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>( + new RowIndexPositionRecorder(*rowIndexEntry)); + indexStream = + factory.createStream(proto::Stream_Kind_ROW_INDEX); + } + } + + ColumnWriter::~ColumnWriter() { + // PASS + } + + void ColumnWriter::add(ColumnVectorBatch& batch, + uint64_t offset, + uint64_t numValues) { + notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr); + } + + void ColumnWriter::flush(std::vector<proto::Stream>& streams) { + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_PRESENT); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(notNullEncoder->flush()); + streams.push_back(stream); + } + + uint64_t ColumnWriter::getEstimatedSize() const { + return notNullEncoder->getBufferSize(); + } + + void ColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + getProtoBufStatistics(stats, colStripeStatistics.get()); + } + + void ColumnWriter::mergeStripeStatsIntoFileStats() { + colFileStatistics->merge(*colStripeStatistics); + colStripeStatistics->reset(); + } + + void ColumnWriter::mergeRowGroupStatsIntoStripeStats() { + colStripeStatistics->merge(*colIndexStatistics); + colIndexStatistics->reset(); + } + + void ColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + getProtoBufStatistics(stats, colFileStatistics.get()); + } + + void ColumnWriter::createRowIndexEntry() { + proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics(); + colIndexStatistics->toProtoBuf(*indexStats); + + *rowIndex->add_entry() = *rowIndexEntry; + + rowIndexEntry->clear_positions(); + rowIndexEntry->clear_statistics(); + + if (enableStats) { + colStripeStatistics->merge(*colIndexStatistics); + } + colIndexStatistics->reset(); + + recordPosition(); + } + + void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { + // write row index to output stream + rowIndex->SerializeToZeroCopyStream(indexStream.get()); + + // construct row index stream + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_ROW_INDEX); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(indexStream->flush()); + streams.push_back(stream); + } + + void ColumnWriter::recordPosition() const { + notNullEncoder->recordPosition(rowIndexPosition.get()); + } + + void ColumnWriter::resetIndex() { + // clear row index + rowIndex->clear_entry(); + rowIndexEntry->clear_positions(); + rowIndexEntry->clear_statistics(); + + // write current positions + recordPosition(); + } + + class StructColumnWriter : public ColumnWriter { + public: + StructColumnWriter( + const Type& type, + StreamsFactory& factory, + const WriterOptions& options); + ~StructColumnWriter(); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void mergeStripeStatsIntoFileStats() override; + + virtual void mergeRowGroupStatsIntoStripeStats() override; + + virtual void createRowIndexEntry() override; + + virtual void writeIndex( + std::vector<proto::Stream> &streams) const override; + + virtual void resetIndex() override; + + private: + std::vector<ColumnWriter *> children; --- End diff -- Ok. > Implement Basic C++ Writer and Writer Option > -------------------------------------------- > > Key: ORC-178 > URL: https://issues.apache.org/jira/browse/ORC-178 > Project: ORC > Issue Type: Sub-task > Components: C++ > Reporter: Gang Wu > Assignee: Xiening Dai > > 1. write orc file header, file footer, postscript, etc. > 2. write columns of all types > 3. write column statistics > 4. write index stream in writer and reader seeks to row based on index > information -- This message was sent by Atlassian JIRA (v6.4.14#64029)