Github user wgtmac commented on a diff in the pull request:
https://github.com/apache/orc/pull/149#discussion_r140425023
--- Diff: c++/src/ColumnWriter.cc ---
@@ -468,25 +472,1099 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}
- std::unique_ptr<ColumnWriter> buildWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) {
- switch (static_cast<int64_t>(type.getKind())) {
- case STRUCT:
- return std::unique_ptr<ColumnWriter>(
- new StructColumnWriter(
- type,
- factory,
- options));
- case INT:
- case LONG:
- case SHORT:
- return std::unique_ptr<ColumnWriter>(
- new IntegerColumnWriter(
- type,
- factory,
- options));
+ class ByteColumnWriter : public ColumnWriter {
+ public:
+ ByteColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> byteRleEncoder;
+ };
+
+ ByteColumnWriter::ByteColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+
factory.createStream(proto::Stream_Kind_DATA);
+ byteRleEncoder = createByteRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch =
+ dynamic_cast<LongVectorBatch&>(rowBatch);
+
+ int64_t* data = byteBatch.data.data() + offset;
+ const char* notNull = byteBatch.hasNulls ?
+ byteBatch.notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ byteRleEncoder->add(byteData, numValues, notNull);
+
+ IntegerColumnStatisticsImpl* intStats =
+
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ intStats->increase(1);
+ intStats->update(static_cast<int64_t>(byteData[i]), 1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ intStats->setHasNull(hasNull);
+ }
+
+ void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(byteRleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ByteColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += byteRleEncoder->getBufferSize();
+ return size;
+ }
+
+ void ByteColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void ByteColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ byteRleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class BooleanColumnWriter : public ColumnWriter {
+ public:
+ BooleanColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> rleEncoder;
+ };
+
+ BooleanColumnWriter::BooleanColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createBooleanRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
+ int64_t* data = byteBatch.data.data() + offset;
+ const char* notNull = byteBatch.hasNulls ?
+ byteBatch.notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ rleEncoder->add(byteData, numValues, notNull);
+
+ BooleanColumnStatisticsImpl* boolStats =
+
dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ boolStats->increase(1);
+ boolStats->update(byteData[i], 1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ boolStats->setHasNull(hasNull);
+ }
+
+ void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(rleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t BooleanColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += rleEncoder->getBufferSize();
+ return size;
+ }
+
+ void BooleanColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings)
const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void BooleanColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ rleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class DoubleColumnWriter : public ColumnWriter {
+ public:
+ DoubleColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloat);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ bool isFloat;
+ std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+ DataBuffer<char> buffer;
+ };
+
+ DoubleColumnWriter::DoubleColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloatType) :
+ ColumnWriter(type, factory, options),
+ isFloat(isFloatType),
+ buffer(*options.getMemoryPool()) {
+ dataStream.reset(new AppendOnlyBufferedStream(
+
factory.createStream(proto::Stream_Kind_DATA)));
+ buffer.resize(isFloat ? 4 : 8);
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ const DoubleVectorBatch& dblBatch =
+ dynamic_cast<const
DoubleVectorBatch&>(rowBatch);
+
+ const double* doubleData = dblBatch.data.data() + offset;
+ const char* notNull = dblBatch.hasNulls ?
+ dblBatch.notNull.data() + offset : nullptr;
+
+ size_t bytes = isFloat ? 4 : 8;
+ char* data = buffer.data();
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ if (isFloat) {
+ // to avoid float-double cast
+ const int32_t* intBits =
+ reinterpret_cast<const int32_t*>(&static_cast<const float&>(
+ doubleData[i]));
+ for (size_t j = 0; j < bytes; ++j) {
+ data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
+ }
+ } else {
+ const int64_t* intBits =
+ reinterpret_cast<const int64_t*>(&(doubleData[i]));
+ for (size_t j = 0; j < bytes; ++j) {
+ data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
+ }
--- End diff --
Done
---