This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 08c3480ac ORC-1264: [C++] Add a writer option to align compression
block with row group
08c3480ac is described below
commit 08c3480aceeb2805cadc594c181b7c3c835d19c3
Author: luffy-zh <[email protected]>
AuthorDate: Thu Nov 7 12:05:25 2024 -0800
ORC-1264: [C++] Add a writer option to align compression block with row
group
## What changes were proposed in this pull request?
Add support for the ORC writer to ensure that the compression block is
aligned with the row group boundary。
## Why are the changes needed?
To reduce unnecessary I/O and decompression when PPD is in effect, we can
enforce the compression block to be aligned with the row group boundary. For
more detail, see
[link](https://issues.apache.org/jira/projects/ORC/issues/ORC-1264?filter=allopenissues)
## How was this patch tested?
Uts in TestWriter.cc can convert this patch.
## Was this patch authored or co-authored using generative AI tooling?
NO
Closes #2005 from luffy-zh/ORC-1264.
Lead-authored-by: luffy-zh <[email protected]>
Co-authored-by: Hao Zou <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
c++/include/orc/Reader.hh | 19 ++++++
c++/include/orc/Writer.hh | 13 ++++
c++/src/ColumnWriter.cc | 111 +++++++++++++++++++++++++++++++
c++/src/ColumnWriter.hh | 12 ++++
c++/src/Compression.cc | 17 ++---
c++/src/Reader.cc | 67 ++++++++++++++++---
c++/src/Reader.hh | 11 ++--
c++/src/Writer.cc | 14 ++++
c++/src/io/OutputStream.cc | 11 +++-
c++/src/io/OutputStream.hh | 1 +
c++/test/TestWriter.cc | 159 ++++++++++++++++++++++++++++++++++++++-------
11 files changed, 386 insertions(+), 49 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 4b254593e..4ddce64ad 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -62,6 +62,15 @@ namespace orc {
};
ReaderMetrics* getDefaultReaderMetrics();
+ // Row group index of a single column in a stripe.
+ struct RowGroupIndex {
+ // Positions are represented as a two-dimensional array where the first
+ // dimension is row group index and the second dimension is the position
+ // list of the row group. The size of the second dimension should be equal
+ // among all row groups.
+ std::vector<std::vector<uint64_t>> positions;
+ };
+
/**
* Options for creating a Reader.
*/
@@ -605,6 +614,16 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;
+
+ /**
+ * Get row group index of all selected columns in the specified stripe
+ * @param stripeIndex index of the stripe to be read for row group index.
+ * @param included index of selected columns to return (if not specified,
+ * all columns will be returned).
+ * @return map of row group index keyed by its column index.
+ */
+ virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
+ uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const =
0;
};
/**
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index b560627c4..78f06739b 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -290,6 +290,19 @@ namespace orc {
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;
+
+ /**
+ * Set whether the compression block should be aligned to row group
boundary.
+ * The boolean type may not be aligned to row group boundary due to the
+ * requirement of the Boolean RLE encoder to pack input bits into bytes
+ */
+ WriterOptions& setAlignBlockBoundToRowGroup(bool
alignBlockBoundToRowGroup);
+
+ /**
+ * Get if the compression block should be aligned to row group boundary.
+ * @return if not set, return default value which is false.
+ */
+ bool getAlignBlockBoundToRowGroup() const;
};
class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 7adca1440..d31b1c65d 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -254,6 +254,10 @@ namespace orc {
// PASS
}
+ void ColumnWriter::finishStreams() {
+ notNullEncoder->finishEncode();
+ }
+
class StructColumnWriter : public ColumnWriter {
public:
StructColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -283,6 +287,8 @@ namespace orc {
virtual void reset() override;
+ virtual void finishStreams() override;
+
private:
std::vector<std::unique_ptr<ColumnWriter>> children_;
};
@@ -416,6 +422,13 @@ namespace orc {
}
}
+ void StructColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ for (uint32_t i = 0; i < children_.size(); ++i) {
+ children_[i]->finishStreams();
+ }
+ }
+
template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
@@ -433,6 +446,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
protected:
std::unique_ptr<RleEncoder> rleEncoder;
@@ -528,6 +543,12 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}
+ template <typename BatchType>
+ void IntegerColumnWriter<BatchType>::finishStreams() {
+ ColumnWriter::finishStreams();
+ rleEncoder->finishEncode();
+ }
+
template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
@@ -544,6 +565,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
private:
std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
};
@@ -637,6 +660,12 @@ namespace orc {
byteRleEncoder_->recordPosition(rowIndexPosition.get());
}
+ template <typename BatchType>
+ void ByteColumnWriter<BatchType>::finishStreams() {
+ ColumnWriter::finishStreams();
+ byteRleEncoder_->finishEncode();
+ }
+
template <typename BatchType>
class BooleanColumnWriter : public ColumnWriter {
public:
@@ -654,6 +683,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
};
@@ -750,6 +781,12 @@ namespace orc {
rleEncoder_->recordPosition(rowIndexPosition.get());
}
+ template <typename BatchType>
+ void BooleanColumnWriter<BatchType>::finishStreams() {
+ ColumnWriter::finishStreams();
+ rleEncoder_->finishEncode();
+ }
+
template <typename ValueType, typename BatchType>
class FloatingColumnWriter : public ColumnWriter {
public:
@@ -767,6 +804,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
private:
bool isFloat_;
std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
@@ -878,6 +917,12 @@ namespace orc {
dataStream_->recordPosition(rowIndexPosition.get());
}
+ template <typename ValueType, typename BatchType>
+ void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
+ ColumnWriter::finishStreams();
+ dataStream_->finishStream();
+ }
+
/**
* Implementation of increasing sorted string dictionary
*/
@@ -1041,6 +1086,8 @@ namespace orc {
virtual void reset() override;
+ virtual void finishStreams() override;
+
private:
/**
* dictionary related functions
@@ -1234,6 +1281,14 @@ namespace orc {
}
}
+ void StringColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ if (!useDictionary) {
+ directDataStream->finishStream();
+ directLengthEncoder->finishEncode();
+ }
+ }
+
bool StringColumnWriter::checkDictionaryKeyRatio() {
if (!doneDictionaryCheck) {
useDictionary = dictionary.size() <=
@@ -1583,6 +1638,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
protected:
std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
@@ -1723,6 +1780,12 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}
+ void TimestampColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ secRleEncoder->finishEncode();
+ nanoRleEncoder->finishEncode();
+ }
+
class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const
WriterOptions& options);
@@ -1792,6 +1855,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
protected:
RleVersion rleVersion;
uint64_t precision;
@@ -1910,6 +1975,12 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}
+ void Decimal64ColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ valueStream->finishStream();
+ scaleEncoder->finishEncode();
+ }
+
class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
@@ -1926,6 +1997,8 @@ namespace orc {
virtual void recordPosition() const override;
+ virtual void finishStreams() override;
+
protected:
uint64_t precision;
uint64_t scale;
@@ -2016,6 +2089,11 @@ namespace orc {
valueEncoder->recordPosition(rowIndexPosition.get());
}
+ void Decimal64ColumnWriterV2::finishStreams() {
+ ColumnWriter::finishStreams();
+ valueEncoder->finishEncode();
+ }
+
class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -2131,6 +2209,8 @@ namespace orc {
virtual void reset() override;
+ virtual void finishStreams() override;
+
private:
std::unique_ptr<RleEncoder> lengthEncoder_;
RleVersion rleVersion_;
@@ -2307,6 +2387,14 @@ namespace orc {
}
}
+ void ListColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ lengthEncoder_->finishEncode();
+ if (child_) {
+ child_->finishStreams();
+ }
+ }
+
class MapColumnWriter : public ColumnWriter {
public:
MapColumnWriter(const Type& type, const StreamsFactory& factory, const
WriterOptions& options);
@@ -2339,6 +2427,8 @@ namespace orc {
virtual void reset() override;
+ virtual void finishStreams() override;
+
private:
std::unique_ptr<ColumnWriter> keyWriter_;
std::unique_ptr<ColumnWriter> elemWriter_;
@@ -2557,6 +2647,17 @@ namespace orc {
}
}
+ void MapColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ lengthEncoder_->finishEncode();
+ if (keyWriter_) {
+ keyWriter_->finishStreams();
+ }
+ if (elemWriter_) {
+ elemWriter_->finishStreams();
+ }
+ }
+
class UnionColumnWriter : public ColumnWriter {
public:
UnionColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -2589,6 +2690,8 @@ namespace orc {
virtual void reset() override;
+ virtual void finishStreams() override;
+
private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
std::vector<std::unique_ptr<ColumnWriter>> children_;
@@ -2760,6 +2863,14 @@ namespace orc {
}
}
+ void UnionColumnWriter::finishStreams() {
+ ColumnWriter::finishStreams();
+ rleEncoder_->finishEncode();
+ for (uint32_t i = 0; i < children_.size(); ++i) {
+ children_[i]->finishStreams();
+ }
+ }
+
std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const
StreamsFactory& factory,
const WriterOptions& options) {
switch (static_cast<int64_t>(type.getKind())) {
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 8afd1eb72..1c5e15d70 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -179,6 +179,18 @@ namespace orc {
*/
virtual void writeDictionary();
+ /**
+ * Finalize the encoding and compressing process. This function should be
+ * called after all data required for encoding has been added. It ensures
+ * that any remaining data is processed and the final state of the streams
+ * is set.
+ * Note: boolean type cannot cut off the current byte if it is not filled
+ * with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled
+ * trailing bits. In this case, the last byte will be the head of the next
+ * compression block.
+ */
+ virtual void finishStreams();
+
protected:
/**
* Utility function to translate ColumnStatistics into protobuf form and
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index b5ca5a4c9..f373a75bf 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -176,6 +176,7 @@ namespace orc {
}
virtual void finishStream() override {
compressInternal();
+ BufferedOutputStream::finishStream();
}
protected:
@@ -982,13 +983,7 @@ namespace orc {
}
uint64_t BlockCompressionStream::flush() {
- void* data;
- int size;
- if (!Next(&data, &size)) {
- throw CompressionError("Failed to flush compression buffer.");
- }
- BufferedOutputStream::BackUp(outputSize - outputPosition);
- bufferSize = outputSize = outputPosition = 0;
+ finishStream();
return BufferedOutputStream::flush();
}
@@ -1031,7 +1026,13 @@ namespace orc {
}
void BlockCompressionStream::finishStream() {
- doBlockCompression();
+ void* data;
+ int size;
+ if (!Next(&data, &size)) {
+ throw CompressionError("Failed to flush compression buffer.");
+ }
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
+ bufferSize = outputSize = outputPosition = 0;
}
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 2966c2c2e..034ea04ee 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1426,17 +1426,10 @@ namespace orc {
uint32_t stripeIndex, const std::set<uint32_t>& included) const {
std::map<uint32_t, BloomFilterIndex> ret;
- // find stripe info
- if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
- throw std::logic_error("Illegal stripe index: " +
- to_string(static_cast<int64_t>(stripeIndex)));
- }
- const proto::StripeInformation currentStripeInfo =
- footer_->stripes(static_cast<int>(stripeIndex));
- const proto::StripeFooter currentStripeFooter =
getStripeFooter(currentStripeInfo, *contents_);
+ uint64_t offset;
+ auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);
// iterate stripe footer to get stream of bloom_filter
- uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint32_t column = static_cast<uint32_t>(stream.column());
@@ -1474,6 +1467,62 @@ namespace orc {
return ret;
}
+ proto::StripeFooter ReaderImpl::loadCurrentStripeFooter(uint32_t stripeIndex,
+ uint64_t& offset)
const {
+ // find stripe info
+ if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
+ throw std::logic_error("Illegal stripe index: " +
+ to_string(static_cast<int64_t>(stripeIndex)));
+ }
+ const proto::StripeInformation currentStripeInfo =
+ footer_->stripes(static_cast<int>(stripeIndex));
+ offset = static_cast<uint64_t>(currentStripeInfo.offset());
+ return getStripeFooter(currentStripeInfo, *contents_);
+ }
+
+ std::map<uint32_t, RowGroupIndex> ReaderImpl::getRowGroupIndex(
+ uint32_t stripeIndex, const std::set<uint32_t>& included) const {
+ std::map<uint32_t, RowGroupIndex> ret;
+ uint64_t offset;
+ auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);
+
+ // iterate stripe footer to get stream of row_index
+ for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
+ const proto::Stream& stream = currentStripeFooter.streams(i);
+ uint32_t column = static_cast<uint32_t>(stream.column());
+ uint64_t length = static_cast<uint64_t>(stream.length());
+ RowGroupIndex& rowGroupIndex = ret[column];
+
+ if (stream.kind() == proto::Stream_Kind_ROW_INDEX &&
+ (included.empty() || included.find(column) != included.end())) {
+ std::unique_ptr<SeekableInputStream> pbStream =
+ createDecompressor(contents_->compression,
+ std::make_unique<SeekableFileInputStream>(
+ contents_->stream.get(), offset, length,
*contents_->pool),
+ contents_->blockSize, *(contents_->pool),
contents_->readerMetrics);
+
+ proto::RowIndex pbRowIndex;
+ if (!pbRowIndex.ParseFromZeroCopyStream(pbStream.get())) {
+ std::stringstream errMsgBuffer;
+ errMsgBuffer << "Failed to parse RowIndex at column " << column << "
in stripe "
+ << stripeIndex;
+ throw ParseError(errMsgBuffer.str());
+ }
+
+ // add rowGroupIndex to result for one column
+ for (auto& rowIndexEntry : pbRowIndex.entry()) {
+ std::vector<uint64_t> posVector;
+ for (auto& position : rowIndexEntry.positions()) {
+ posVector.push_back(position);
+ }
+ rowGroupIndex.positions.push_back(posVector);
+ }
+ }
+ offset += length;
+ }
+ return ret;
+ }
+
RowReader::~RowReader() {
// PASS
}
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 630d812c3..89606c331 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -265,10 +265,10 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
- void getRowIndexStatistics(
- const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
- const proto::StripeFooter& currentStripeFooter,
- std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
+ void getRowIndexStatistics(const proto::StripeInformation& stripeInfo,
uint64_t stripeIndex,
+ const proto::StripeFooter& currentStripeFooter,
+
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
+ proto::StripeFooter loadCurrentStripeFooter(uint32_t stripeIndex,
uint64_t& offset) const;
// metadata
mutable bool isMetadataLoaded_;
@@ -374,6 +374,9 @@ namespace orc {
std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const
override;
+
+ std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
+ uint32_t stripeIndex, const std::set<uint32_t>& included) const
override;
};
} // namespace orc
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 531b56655..775e6d245 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -47,6 +47,7 @@ namespace orc {
bool useTightNumericVector;
uint64_t outputBufferCapacity;
uint64_t memoryBlockSize;
+ bool alignBlockBoundToRowGroup;
WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) { // default
to Hive_0_12
stripeSize = 64 * 1024 * 1024; // 64M
@@ -69,6 +70,7 @@ namespace orc {
useTightNumericVector = false;
outputBufferCapacity = 1024 * 1024;
memoryBlockSize = 64 * 1024; // 64K
+ alignBlockBoundToRowGroup = false;
}
};
@@ -298,6 +300,15 @@ namespace orc {
return privateBits_->memoryBlockSize;
}
+ WriterOptions& WriterOptions::setAlignBlockBoundToRowGroup(bool
alignBlockBoundToRowGroup) {
+ privateBits_->alignBlockBoundToRowGroup = alignBlockBoundToRowGroup;
+ return *this;
+ }
+
+ bool WriterOptions::getAlignBlockBoundToRowGroup() const {
+ return privateBits_->alignBlockBoundToRowGroup;
+ }
+
Writer::~Writer() {
// PASS
}
@@ -401,6 +412,9 @@ namespace orc {
stripeRows_ += chunkSize;
if (indexRows_ >= rowIndexStride) {
+ if (options_.getAlignBlockBoundToRowGroup()) {
+ columnWriter_->finishStreams();
+ }
columnWriter_->createRowIndexEntry();
indexRows_ = 0;
}
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index aa4dbe6ed..fbf1ca61d 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -128,9 +128,7 @@ namespace orc {
}
uint64_t AppendOnlyBufferedStream::flush() {
- outStream_->BackUp(bufferLength_ - bufferOffset_);
- bufferOffset_ = bufferLength_ = 0;
- buffer_ = nullptr;
+ finishStream();
return outStream_->flush();
}
@@ -150,4 +148,11 @@ namespace orc {
}
}
+ void AppendOnlyBufferedStream::finishStream() {
+ outStream_->BackUp(bufferLength_ - bufferOffset_);
+ outStream_->finishStream();
+ bufferOffset_ = bufferLength_ = 0;
+ buffer_ = nullptr;
+ }
+
} // namespace orc
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 4908f34f2..6319de96d 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -100,6 +100,7 @@ namespace orc {
void write(const char* data, size_t size);
uint64_t getSize() const;
uint64_t flush();
+ void finishStream();
void recordPosition(PositionRecorder* recorder) const;
};
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 8bc4032a5..975462e30 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -57,6 +57,8 @@ namespace orc {
options.setTimezoneName(timezone);
options.setUseTightNumericVector(useTightNumericVector);
options.setMemoryBlockSize(memoryBlockSize);
+ // enable align block bound to row group when stride is not 0
+ options.setAlignBlockBoundToRowGroup(true);
return createWriter(type, stream, options);
}
@@ -84,7 +86,56 @@ namespace orc {
return reader->createRowReader(rowReaderOpts);
}
- class WriterTest : public TestWithParam<FileVersion> {
+ void verifyCompressionBlockAlignment(std::unique_ptr<Reader>& reader,
uint64_t columnCount) {
+ auto stripeCount = reader->getNumberOfStripes();
+ for (uint64_t stripeIndex = 0; stripeIndex < stripeCount; ++stripeIndex) {
+ for (uint64_t i = 0; i < columnCount; ++i) {
+ auto rowGroupIndexMap = reader->getRowGroupIndex(stripeIndex);
+ EXPECT_TRUE(rowGroupIndexMap.size() > 0);
+ auto rowGroupIndex = rowGroupIndexMap[columnCount];
+ auto subType = reader->getType().getSubtype(i);
+ EXPECT_TRUE(rowGroupIndex.positions.size() > 0);
+ for (auto rowGroupPositions : rowGroupIndex.positions) {
+ for (uint64_t posIndex = 0; posIndex < rowGroupPositions.size();
++posIndex) {
+ // After we call finishStream(), unusedBufferSize is set to 0,
+ // so only the first position is valid in each recordPosition call.
+ switch (subType->getKind()) {
+ case DECIMAL:
+ case STRING:
+ case BINARY:
+ case CHAR:
+ case VARCHAR: {
+ if (posIndex != 0 && posIndex != 2) {
+ EXPECT_EQ(rowGroupPositions[posIndex], 0);
+ }
+ break;
+ }
+ case TIMESTAMP_INSTANT:
+ case TIMESTAMP: {
+ if (posIndex != 0 && posIndex != 3) {
+ EXPECT_EQ(rowGroupPositions[posIndex], 0);
+ }
+ break;
+ }
+ default: {
+ if (posIndex != 0) {
+ EXPECT_EQ(rowGroupPositions[posIndex], 0);
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ struct TestParams {
+ FileVersion fileVersion;
+ bool enableAlignBlockBoundToRowGroup;
+ };
+
+ class WriterTest : public TestWithParam<TestParams> {
// You can implement all the usual fixture class members here.
// To access the test parameter, call GetParam() from class
// TestWithParam<T>.
@@ -92,13 +143,15 @@ namespace orc {
protected:
FileVersion fileVersion;
+ bool enableAlignBlockBoundToRowGroup;
public:
- WriterTest() : fileVersion(FileVersion::v_0_11()) {}
+ WriterTest() : fileVersion(FileVersion::v_0_11()),
enableAlignBlockBoundToRowGroup(false) {}
};
void WriterTest::SetUp() {
- fileVersion = GetParam();
+ fileVersion = GetParam().fileVersion;
+ enableAlignBlockBoundToRowGroup =
GetParam().enableAlignBlockBoundToRowGroup;
}
TEST_P(WriterTest, writeEmptyFile) {
@@ -252,7 +305,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
StringVectorBatch* strBatch =
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -294,6 +347,9 @@ namespace orc {
EXPECT_EQ(i, static_cast<uint64_t>(atoi(str.c_str())));
EXPECT_EQ(i, static_cast<uint64_t>(atoi(bin.c_str())));
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
EXPECT_FALSE(rowReader->next(*batch));
}
@@ -315,7 +371,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
DoubleVectorBatch* doubleBatch =
dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
@@ -351,6 +407,10 @@ namespace orc {
0.000001f);
}
EXPECT_FALSE(rowReader->next(*batch));
+
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeShortIntLong) {
@@ -366,7 +426,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
LongVectorBatch* smallIntBatch =
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -403,6 +463,9 @@ namespace orc {
EXPECT_EQ(static_cast<int32_t>(i), intBatch->data[i]);
EXPECT_EQ(static_cast<int64_t>(i), bigIntBatch->data[i]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeTinyint) {
@@ -415,9 +478,9 @@ namespace orc {
uint64_t rowCount = 65535;
uint64_t memoryBlockSize = 64;
- std::unique_ptr<Writer> writer =
- createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZSTD, *type,
- pool, &memStream, fileVersion, 1024, "GMT", true);
+ std::unique_ptr<Writer> writer = createWriter(
+ stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZSTD, *type, pool,
+ &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0,
"GMT", true);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
ByteVectorBatch* byteBatch =
dynamic_cast<ByteVectorBatch*>(structBatch->fields[0]);
@@ -442,6 +505,9 @@ namespace orc {
batch = rowReader->createRowBatch(rowCount);
rowReader->seekToRow(20);
EXPECT_EQ(true, rowReader->next(*batch));
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
auto outByteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -474,7 +540,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
LongVectorBatch* byteBatch =
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -501,6 +567,9 @@ namespace orc {
for (uint64_t i = 0; i < rowCount; ++i) {
EXPECT_EQ((i % 3) == 0 ? 1 : 0, byteBatch->data[i]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeDate) {
@@ -515,7 +584,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -543,6 +612,9 @@ namespace orc {
for (uint64_t i = 0; i < rowCount; ++i) {
EXPECT_EQ(static_cast<int32_t>(i), longBatch->data[i]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeTimestamp) {
@@ -557,7 +629,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
TimestampVectorBatch* tsBatch =
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -589,14 +661,18 @@ namespace orc {
EXPECT_EQ(times[i], tsBatch->data[i]);
EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeNegativeTimestamp) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
std::unique_ptr<Type>
type(Type::buildTypeFromString("struct<a:timestamp>"));
- auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ auto writer =
+ createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024,
CompressionKind_ZLIB, *type, pool,
+ &memStream, fileVersion, enableAlignBlockBoundToRowGroup
? 1024 : 0);
uint64_t batchCount = 5;
auto batch = writer->createRowBatch(batchCount * 2);
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
@@ -646,6 +722,10 @@ namespace orc {
}
EXPECT_EQ(1000000, tsBatch->nanoseconds[i]);
}
+
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
// TODO: Disable the test below for Windows for following reasons:
@@ -766,7 +846,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
TimestampVectorBatch* tsBatch =
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -798,6 +878,9 @@ namespace orc {
EXPECT_EQ(times[i], tsBatch->data[i]);
EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeCharAndVarcharColumn) {
@@ -815,7 +898,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -877,6 +960,9 @@ namespace orc {
}
EXPECT_FALSE(rowReader->next(*batch));
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeDecimal64Column) {
@@ -892,7 +978,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
Decimal64VectorBatch* decBatch =
dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]);
@@ -954,6 +1040,9 @@ namespace orc {
EXPECT_EQ(dec, decBatch->values[i]);
EXPECT_EQ(-dec, decBatch->values[i + maxPrecision]);
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeDecimal128Column) {
@@ -969,7 +1058,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
Decimal128VectorBatch* decBatch =
dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]);
@@ -1041,6 +1130,9 @@ namespace orc {
EXPECT_EQ(expected, decBatch->values[i].toString());
EXPECT_EQ("-" + expected, decBatch->values[i + maxPrecision].toString());
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeListColumn) {
@@ -1058,7 +1150,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount
* maxListLength);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -1104,6 +1196,9 @@ namespace orc {
EXPECT_EQ(static_cast<int64_t>(i), data[offsets[i] + j]);
}
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeMapColumn) {
@@ -1118,7 +1213,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount
* maxListLength);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
MapVectorBatch* mapBatch =
dynamic_cast<MapVectorBatch*>(structBatch->fields[0]);
@@ -1185,6 +1280,9 @@ namespace orc {
EXPECT_EQ(static_cast<int64_t>(i), elemData[offsets[i] + j]);
}
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeUnionColumn) {
@@ -1200,7 +1298,7 @@ namespace orc {
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
UnionVectorBatch* unionBatch =
dynamic_cast<UnionVectorBatch*>(structBatch->fields[0]);
@@ -1282,6 +1380,9 @@ namespace orc {
break;
}
}
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, writeUTF8CharAndVarcharColumn) {
@@ -1295,7 +1396,7 @@ namespace orc {
uint64_t memoryBlockSize = 64;
std::unique_ptr<Writer> writer =
createWriter(stripeSize, memoryBlockSize, compressionBlockSize,
CompressionKind_ZLIB, *type,
- pool, &memStream, fileVersion);
+ pool, &memStream, fileVersion,
enableAlignBlockBoundToRowGroup ? 1024 : 0);
std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
StructVectorBatch* structBatch =
dynamic_cast<StructVectorBatch*>(batch.get());
StringVectorBatch* charBatch =
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -1353,6 +1454,9 @@ namespace orc {
EXPECT_TRUE(memcmp(varcharBatch->data[2], expectedTwoChars, 4) == 0);
EXPECT_FALSE(rowReader->next(*batch));
+ if (enableAlignBlockBoundToRowGroup) {
+ verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+ }
}
TEST_P(WriterTest, testWriteListColumnWithNull) {
@@ -2296,7 +2400,12 @@ namespace orc {
EXPECT_FALSE(rowReader->next(*batch));
}
- INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
- Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
- FileVersion::UNSTABLE_PRE_2_0()));
+ std::vector<TestParams> testParams = {{FileVersion::v_0_11(), true},
+ {FileVersion::v_0_11(), false},
+ {FileVersion::v_0_12(), false},
+ {FileVersion::v_0_12(), true},
+ {FileVersion::UNSTABLE_PRE_2_0(),
false},
+ {FileVersion::UNSTABLE_PRE_2_0(),
true}};
+
+ INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
::testing::ValuesIn(testParams));
} // namespace orc