This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 67a59db2b ORC-1304: [C++] Fix seeking over empty PRESENT stream (#1519)
67a59db2b is described below
commit 67a59db2b4d7b3a241b82b2d637943c1b3b84fce
Author: Quanlong Huang <[email protected]>
AuthorDate: Thu May 25 14:36:54 2023 +0800
ORC-1304: [C++] Fix seeking over empty PRESENT stream (#1519)
### What changes were proposed in this pull request?
This backports ORC-1304 (#1299) to branch-1.8. Resolved some conflicts due
to ReaderMetrics not supported in branch-1.8.
### Why are the changes needed?
The bug of ORC-1304 also occurs on branch-1.8
### How was this patch tested?
Ran orc-test
Co-authored-by: coderex2522 <[email protected]>
---
c++/src/ByteRLE.cc | 17 ++++----
c++/src/RLEv1.cc | 24 ++++++------
c++/src/RLEv1.hh | 2 +
c++/test/TestByteRle.cc | 11 ++++++
c++/test/TestReader.cc | 98 ++++++++++++++++++++++++++++++++++++++++++++++
c++/test/TestRleDecoder.cc | 13 +++++-
6 files changed, 146 insertions(+), 19 deletions(-)
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index 1c4a64516..557fc8fd7 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -341,6 +341,7 @@ namespace orc {
inline void nextBuffer();
inline signed char readByte();
inline void readHeader();
+ inline void reset();
std::unique_ptr<SeekableInputStream> inputStream;
size_t remainingValues;
@@ -380,9 +381,7 @@ namespace orc {
}
}
- ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream>
- input) {
- inputStream = std::move(input);
+ void ByteRleDecoderImpl::reset() {
repeating = false;
remainingValues = 0;
value = 0;
@@ -390,6 +389,12 @@ namespace orc {
bufferEnd = nullptr;
}
+ ByteRleDecoderImpl::ByteRleDecoderImpl(
+ std::unique_ptr<SeekableInputStream> input) {
+ inputStream = std::move(input);
+ reset();
+ }
+
ByteRleDecoderImpl::~ByteRleDecoderImpl() {
// PASS
}
@@ -397,10 +402,8 @@ namespace orc {
void ByteRleDecoderImpl::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
- // force a re-read from the stream
- bufferEnd = bufferStart;
- // read a new header
- readHeader();
+ // reset the decoder status and lazily call readHeader()
+ reset();
// skip ahead the given number of records
ByteRleDecoderImpl::skip(location.next());
}
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index fe333978d..d80a81a56 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -190,25 +190,27 @@ void RleDecoderV1::readHeader() {
}
}
+void RleDecoderV1::reset() {
+ remainingValues = 0;
+ value = 0;
+ bufferStart = nullptr;
+ bufferEnd = nullptr;
+ delta = 0;
+ repeating = false;
+}
+
RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
bool hasSigned)
: inputStream(std::move(input)),
- isSigned(hasSigned),
- remainingValues(0),
- value(0),
- bufferStart(nullptr),
- bufferEnd(bufferStart),
- delta(0),
- repeating(false) {
+ isSigned(hasSigned) {
+ reset();
}
void RleDecoderV1::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
- // force a re-read from the stream
- bufferEnd = bufferStart;
- // read a new header
- readHeader();
+ // reset the decoder status and lazily call readHeader()
+ reset();
// skip ahead the given number of records
skip(location.next());
}
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 8e31d7087..f05ab17a1 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -77,6 +77,8 @@ private:
inline void skipLongs(uint64_t numValues);
+ inline void reset();
+
const std::unique_ptr<SeekableInputStream> inputStream;
const bool isSigned;
uint64_t remainingValues;
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index 38c01d336..bdcc13c9e 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -813,6 +813,17 @@ TEST(ByteRle, testSeek) {
} while (i != 0);
}
+TEST(ByteRle, seekOverEmptyPresentStream) {
+ const char* buffer = nullptr;
+ std::unique_ptr<ByteRleDecoder> rle =
+ createByteRleDecoder(
+ std::unique_ptr<orc::SeekableInputStream>
+ (new SeekableArrayInputStream(buffer, 0, 1)));
+ std::list<uint64_t> position(2, 0);
+ PositionProvider location(position);
+ rle->seek(location);
+}
+
TEST(BooleanRle, simpleTest) {
const unsigned char buffer[] = {0x61, 0xf0, 0xfd, 0x55, 0xAA, 0x55};
std::unique_ptr<SeekableInputStream> stream
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 237deab05..7578d81a7 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -672,4 +672,102 @@ namespace orc {
EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
}
+ TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ uint64_t rowCount = 5000;
+ {
+ auto type = std::unique_ptr<Type>(
+ Type::buildTypeFromString(
+ "struct<col1:struct<col2:int>,col3:struct<col4:int>,"
+ "col5:array<int>,col6:map<int,int>>"));
+ WriterOptions options;
+ options.setStripeSize(1024 * 1024)
+ .setCompressionBlockSize(1024)
+ .setCompression(CompressionKind_NONE)
+ .setMemoryPool(pool)
+ .setRowIndexStride(1000);
+
+ // the child columns of the col3,col5,col6 have the empty present stream
+ auto writer = createWriter(*type, &memStream, options);
+ auto batch = writer->createRowBatch(rowCount);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& structBatch1 =
dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+ auto& structBatch2 =
dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+ auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+ auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+ auto& longBatch1 =
dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+ auto& longBatch2 =
dynamic_cast<LongVectorBatch&>(*structBatch2.fields[0]);
+ auto& longBatch3 = dynamic_cast<LongVectorBatch&>(*listBatch.elements);
+ auto& longKeyBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.keys);
+ auto& longValueBatch =
dynamic_cast<LongVectorBatch&>(*mapBatch.elements);
+
+ structBatch.numElements = rowCount;
+ structBatch1.numElements = rowCount;
+ structBatch2.numElements = rowCount;
+ listBatch.numElements = rowCount;
+ mapBatch.numElements = rowCount;
+ longBatch1.numElements = rowCount;
+ longBatch2.numElements = rowCount;
+ longBatch3.numElements = rowCount;
+ longKeyBatch.numElements = rowCount;
+ longValueBatch.numElements = rowCount;
+
+ structBatch1.hasNulls = false;
+ structBatch2.hasNulls = true;
+ listBatch.hasNulls = true;
+ mapBatch.hasNulls = true;
+ longBatch1.hasNulls = false;
+ longBatch2.hasNulls = true;
+ longBatch3.hasNulls = true;
+ longKeyBatch.hasNulls = true;
+ longValueBatch.hasNulls = true;
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ longBatch1.data[i] = static_cast<int64_t>(i);
+ longBatch1.notNull[i] = 1;
+
+ structBatch2.notNull[i] = 0;
+ listBatch.notNull[i] = 0;
+ listBatch.offsets[i] = 0;
+ mapBatch.notNull[i] = 0;
+ longBatch2.notNull[i] = 0;
+ longBatch3.notNull[i] = 0;
+ longKeyBatch.notNull[i] = 0;
+ longValueBatch.notNull[i] = 0;
+ }
+ writer->add(*batch);
+ writer->close();
+ }
+ {
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream(memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader =
+ createReader(std::move(inStream), readerOptions);
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+ std::unique_ptr<RowReader> rowReader =
+ reader->createRowReader(RowReaderOptions());
+ auto batch = rowReader->createRowBatch(1000);
+ // seek over the empty present stream
+ rowReader->seekToRow(2000);
+ EXPECT_TRUE(rowReader->next(*batch));
+ EXPECT_EQ(1000, batch->numElements);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& structBatch1 =
dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+ auto& structBatch2 =
dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+ auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+ auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+ auto& longBatch1 =
dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i + 2000));
+ EXPECT_TRUE(longBatch1.notNull[i]);
+ EXPECT_FALSE(structBatch2.notNull[i]);
+ EXPECT_FALSE(listBatch.notNull[i]);
+ EXPECT_FALSE(mapBatch.notNull[i]);
+ }
+ }
+ }
} // namespace
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 1b4ca4e89..3f2409bdb 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -2986,6 +2986,17 @@ TEST(RLEv1, testLeadingNulls) {
for (size_t i = 5; i < 10; ++i) {
EXPECT_EQ(i - 4, data[i]) << "Output wrong at " << i;
}
-};
+}
+
+TEST(RLEv1, seekOverEmptyPresentStream) {
+ const char* buffer = nullptr;
+ std::unique_ptr<RleDecoder> rle =
+ createRleDecoder(std::unique_ptr<SeekableInputStream>
+ (new SeekableArrayInputStream(buffer, 0, 1)),
+ false, RleVersion_1, *getDefaultPool());
+ std::list<uint64_t> position(2, 0);
+ PositionProvider location(position);
+ rle->seek(location);
+}
} // namespace orc