Github user wgtmac commented on a diff in the pull request:
https://github.com/apache/orc/pull/149#discussion_r140425006
--- Diff: c++/src/ColumnWriter.cc ---
@@ -468,25 +472,1099 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}
- std::unique_ptr<ColumnWriter> buildWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) {
- switch (static_cast<int64_t>(type.getKind())) {
- case STRUCT:
- return std::unique_ptr<ColumnWriter>(
- new StructColumnWriter(
- type,
- factory,
- options));
- case INT:
- case LONG:
- case SHORT:
- return std::unique_ptr<ColumnWriter>(
- new IntegerColumnWriter(
- type,
- factory,
- options));
+ class ByteColumnWriter : public ColumnWriter {
+ public:
+ ByteColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> byteRleEncoder;
+ };
+
+ ByteColumnWriter::ByteColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+
factory.createStream(proto::Stream_Kind_DATA);
+ byteRleEncoder = createByteRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch =
+ dynamic_cast<LongVectorBatch&>(rowBatch);
+
+ int64_t* data = byteBatch.data.data() + offset;
+ const char* notNull = byteBatch.hasNulls ?
+ byteBatch.notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ byteRleEncoder->add(byteData, numValues, notNull);
+
+ IntegerColumnStatisticsImpl* intStats =
+
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ intStats->increase(1);
+ intStats->update(static_cast<int64_t>(byteData[i]), 1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ intStats->setHasNull(hasNull);
+ }
+
+ void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(byteRleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ByteColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += byteRleEncoder->getBufferSize();
+ return size;
+ }
+
+ void ByteColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void ByteColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ byteRleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class BooleanColumnWriter : public ColumnWriter {
+ public:
+ BooleanColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> rleEncoder;
+ };
+
+ BooleanColumnWriter::BooleanColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createBooleanRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
--- End diff --
Done
---