[
https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041373#comment-16041373
]
ASF GitHub Bot commented on ORC-178:
------------------------------------
Github user xndai commented on a diff in the pull request:
https://github.com/apache/orc/pull/128#discussion_r120706989
--- Diff: c++/src/Writer.cc ---
@@ -0,0 +1,659 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Common.hh"
+#include "orc/OrcFile.hh"
+
+#include "ColumnWriter.hh"
+#include "Timezone.hh"
+
+#include <memory>
+
+namespace orc {
+
+ struct WriterOptionsPrivate {
+ uint64_t stripeSize;
+ uint64_t blockSize;
+ uint64_t rowIndexStride;
+ uint64_t bufferSize;
+ bool blockPadding;
+ CompressionKind compression;
+ EncodingStrategy encodingStrategy;
+ CompressionStrategy compressionStrategy;
+ MemoryPool *memoryPool;
+ WriterVersion version;
+ double paddingTolerance;
+ std::ostream* errorStream;
+ RleVersion rleVersion;
+ double dictionaryKeySizeThreshold;
+ bool enableStats;
+ bool enableStrStatsCmp;
+ bool enableIndex;
+ const Timezone* timezone;
+
+ WriterOptionsPrivate() {
+ stripeSize = 64 * 1024 * 1024; // 64M
+ blockSize = 256 * 1024; // 256K
+ rowIndexStride = 10000;
+ bufferSize = 4 * 1024 * 1024; // 4M
+ blockPadding = false;
+ compression = CompressionKind_ZLIB;
+ encodingStrategy = EncodingStrategy_SPEED;
+ compressionStrategy = CompressionStrategy_SPEED;
+ memoryPool = getDefaultPool();
+ version = WriterVersion_ORC_135;
+ paddingTolerance = 0.0;
+ errorStream = &std::cerr;
+ rleVersion = RleVersion_1;
+ dictionaryKeySizeThreshold = 0.0;
+ enableStats = true;
+ enableStrStatsCmp = false;
+ enableIndex = true;
+ timezone = &getLocalTimezone();
+ }
+ };
+
+ WriterOptions::WriterOptions():
+ privateBits(std::unique_ptr<WriterOptionsPrivate>
+ (new WriterOptionsPrivate())) {
+ // PASS
+ }
+
+ WriterOptions::WriterOptions(const WriterOptions& rhs):
+ privateBits(std::unique_ptr<WriterOptionsPrivate>
+ (new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
+ // PASS
+ }
+
+ WriterOptions::WriterOptions(WriterOptions& rhs) {
+ // swap privateBits with rhs
+ WriterOptionsPrivate* l = privateBits.release();
+ privateBits.reset(rhs.privateBits.release());
+ rhs.privateBits.reset(l);
+ }
+
+ WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
+ if (this != &rhs) {
+ privateBits.reset(new
WriterOptionsPrivate(*(rhs.privateBits.get())));
+ }
+ return *this;
+ }
+
+ WriterOptions::~WriterOptions() {
+ // PASS
+ }
+
+ WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
+ privateBits->stripeSize = size;
+ return *this;
+ }
+
+ uint64_t WriterOptions::getStripeSize() const {
+ return privateBits->stripeSize;
+ }
+
+ WriterOptions& WriterOptions::setBlockSize(uint64_t size) {
+ privateBits->blockSize = size;
+ return *this;
+ }
+
+ uint64_t WriterOptions::getBlockSize() const {
+ return privateBits->blockSize;
+ }
+
+ WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
+ privateBits->rowIndexStride = stride;
+ return *this;
+ }
+
+ uint64_t WriterOptions::getRowIndexStride() const {
+ return privateBits->rowIndexStride;
+ }
+
+ WriterOptions& WriterOptions::setBufferSize(uint64_t size) {
+ privateBits->bufferSize = size;
+ return *this;
+ }
+
+ uint64_t WriterOptions::getBufferSize() const {
+ return privateBits->bufferSize;
+ }
+
+ WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
+ privateBits->dictionaryKeySizeThreshold = val;
+ return *this;
+ }
+
+ double WriterOptions::getDictionaryKeySizeThreshold() const {
+ return privateBits->dictionaryKeySizeThreshold;
+ }
+
+ WriterOptions& WriterOptions::setBlockPadding(bool padding) {
+ privateBits->blockPadding = padding;
+ return *this;
+ }
+
+ bool WriterOptions::getBlockPadding() const {
+ return privateBits->blockPadding;
+ }
+
+ WriterOptions& WriterOptions::setRleVersion(RleVersion version) {
+ privateBits->rleVersion = version;
+ return *this;
+ }
+
+ RleVersion WriterOptions::getRleVersion() const {
+ return privateBits->rleVersion;
+ }
+
+ WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
+ privateBits->compression = comp;
+ return *this;
+ }
+
+ CompressionKind WriterOptions::getCompression() const {
+ return privateBits->compression;
+ }
+
+ WriterOptions& WriterOptions::setEncodingStrategy(EncodingStrategy
strategy) {
+ privateBits->encodingStrategy = strategy;
+ return *this;
+ }
+
+ EncodingStrategy WriterOptions::getEncodingStrategy() const {
+ return privateBits->encodingStrategy;
+ }
+
+ WriterOptions& WriterOptions::setCompressionStrategy(
+ CompressionStrategy strategy) {
+ privateBits->compressionStrategy = strategy;
+ return *this;
+ }
+
+ CompressionStrategy WriterOptions::getCompressionStrategy() const {
+ return privateBits->compressionStrategy;
+ }
+
+ WriterOptions& WriterOptions::setWriterVersion(WriterVersion version) {
+ privateBits->version = version;
+ return *this;
+ }
+
+ WriterVersion WriterOptions::getWriterVersion() const {
+ return privateBits->version;
+ }
+
+ WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
+ privateBits->paddingTolerance = tolerance;
+ return *this;
+ }
+
+ double WriterOptions::getPaddingTolerance() const {
+ return privateBits->paddingTolerance;
+ }
+
+ WriterOptions& WriterOptions::setMemoryPool(MemoryPool * memoryPool) {
+ privateBits->memoryPool = memoryPool;
+ return *this;
+ }
+
+ MemoryPool * WriterOptions::getMemoryPool() const {
+ return privateBits->memoryPool;
+ }
+
+ WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
+ privateBits->errorStream = &errStream;
+ return *this;
+ }
+
+ std::ostream * WriterOptions::getErrorStream() const {
+ return privateBits->errorStream;
+ }
+
+ WriterOptions& WriterOptions::setEnableStats(bool enable) {
+ privateBits->enableStats = enable;
+ return *this;
+ }
+
+ bool WriterOptions::getEnableStats() const {
+ return privateBits->enableStats;
+ }
+
+ WriterOptions& WriterOptions::setEnableStrStatsCmp(bool enable) {
+ privateBits->enableStrStatsCmp = enable;
+ return *this;
+ }
+
+ bool WriterOptions::getEnableStrStatsCmp() const {
+ return privateBits->enableStrStatsCmp;
+ }
+
+ WriterOptions& WriterOptions::setEnableIndex(bool enable) {
+ privateBits->enableIndex = enable;
+ return *this;
+ }
+
+ bool WriterOptions::getEnableIndex() const {
+ return privateBits->enableIndex;
+ }
+
+ const Timezone* WriterOptions::getTimezone() const {
+ return privateBits->timezone;
+ }
+
+ WriterOptions& WriterOptions::setTimezone(const std::string& zone) {
+ privateBits->timezone = &getTimezoneByName(zone);
+ return *this;
+ }
+
+ Writer::~Writer() {
+ // PASS
+ }
+
+ class WriterImpl : public Writer {
+ private:
+ std::unique_ptr<ColumnWriter> columnWriter;
+ std::unique_ptr<BufferedOutputStream> compressionStream;
+ std::unique_ptr<BufferedOutputStream> bufferedStream;
+ std::unique_ptr<StreamsFactory> streamsFactory;
+ OutputStream * outStream;
+ WriterOptions options;
+ const Type& type;
+ uint64_t stripeRows, totalRows, indexRows;
+ uint64_t currentOffset;
+ proto::Footer fileFooter;
+ proto::PostScript postScript;
+ proto::StripeInformation stripeInfo;
+ proto::Metadata metadata;
+
+ static const char * magicId;
+
+ public:
+ WriterImpl(
+ const Type& type,
+ OutputStream* stream,
+ const WriterOptions& options);
+
+ std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
+ const override;
+
+ void add(ColumnVectorBatch& rowsToAdd) override;
+
+ void close() override;
+
+ private:
+ void Init();
+ void InitStripe();
+ void WriteStripe();
+ void WriteMetadata();
+ void WriteFileFooter();
+ void WritePostscript();
+ void BuildFooterType(const Type& t, proto::Footer& footer, uint32_t&
index);
+ static proto::CompressionKind convertCompressionKind(
+ const CompressionKind&
kind);
+ };
+
+ const char * WriterImpl::magicId = "ORC";
+
+ WriterImpl::WriterImpl(
+ const Type& t,
+ OutputStream * stream,
+ const WriterOptions& opts) :
+ outStream(stream),
+ options(opts),
+ type(t) {
+ streamsFactory = createStreamsFactory(options, outStream);
+ columnWriter = buildWriter(type, *streamsFactory, options);
+ stripeRows = totalRows = indexRows = 0;
+ currentOffset = 0;
+
+ uint64_t bufferCapacity = 4 * 1024 * 1024; // 4M
+ compressionStream = createCompressor(
+ options.getCompression(),
+ outStream,
+ options.getCompressionStrategy(),
+ bufferCapacity,
+ options.getBlockSize(),
+ *options.getMemoryPool());
+ bufferedStream.reset(new BufferedOutputStream(
+ *options.getMemoryPool(),
+ outStream,
+ bufferCapacity,
+ options.getBlockSize()));
+
+ Init();
+ }
+
+ std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t
size)
+
const {
+ return type.createRowBatch(size, *options.getMemoryPool());
+ }
+
+ void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
+ if (options.getEnableIndex()) {
+ uint64_t pos = 0;
+ uint64_t chunkSize = 0;
+ uint64_t rowIndexStride = options.getRowIndexStride();
+ while (pos < rowsToAdd.numElements) {
+ chunkSize = std::min(rowsToAdd.numElements - pos,
+ rowIndexStride - indexRows);
+ columnWriter->add(rowsToAdd, pos, chunkSize);
+
+ pos += chunkSize;
+ indexRows += chunkSize;
+ stripeRows += chunkSize;
+
+ if (indexRows >= rowIndexStride) {
+ columnWriter->createRowIndexEntry();
+ indexRows = 0;
+ }
+ }
+ } else {
+ stripeRows += rowsToAdd.numElements;
+ columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
+ }
+
+ if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
+ WriteStripe();
+ }
--- End diff --
We won't throw exception here. If estimated size is smaller than stripe
size, it just means the current data added are not big enough to form a stripe.
We continue keeping data in memory until a subsequent add() that satisfies this
check, or a call to Writer::close() which forces us to flush any remaining data
as the last stripe of file.
> Implement Basic C++ Writer and Writer Option
> --------------------------------------------
>
> Key: ORC-178
> URL: https://issues.apache.org/jira/browse/ORC-178
> Project: ORC
> Issue Type: Sub-task
> Components: C++
> Reporter: Gang Wu
> Assignee: Xiening Dai
>
> 1. write orc file header, file footer, postscript, etc.
> 2. write columns of all types
> 3. write column statistics
> 4. write index stream in writer and reader seeks to row based on index
> information
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)