This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 42aed71 ORC-1130:[C++] Suppress the present stream when the data
stream has no null value
42aed71 is described below
commit 42aed7145444f614e3414bc29bb43378cc967003
Author: coderex2522 <[email protected]>
AuthorDate: Tue Mar 22 10:15:52 2022 +0800
ORC-1130:[C++] Suppress the present stream when the data stream has no null
value
This closes #1067
---
c++/src/ByteRLE.cc | 29 +++++++++++---
c++/src/ByteRLE.hh | 5 +++
c++/src/ColumnWriter.cc | 32 +++++++++++++++-
c++/src/ColumnWriter.hh | 1 +
c++/src/io/OutputStream.cc | 4 ++
c++/src/io/OutputStream.hh | 1 +
c++/test/TestWriter.cc | 95 ++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 159 insertions(+), 8 deletions(-)
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index ee1a457..1c4a645 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -61,6 +61,13 @@ namespace orc {
virtual void recordPosition(PositionRecorder* recorder) const override;
+ virtual void suppress() override;
+
+ /**
+ * Reset to initial state
+ */
+ void reset();
+
protected:
std::unique_ptr<BufferedOutputStream> outputStream;
char* literals;
@@ -80,12 +87,7 @@ namespace orc {
std::unique_ptr<BufferedOutputStream> output)
: outputStream(std::move(output)) {
literals = new char[MAX_LITERAL_SIZE];
- numLiterals = 0;
- tailRunLength = 0;
- repeat = false;
- bufferPosition = 0;
- bufferLength = 0;
- buffer = nullptr;
+ reset();
}
ByteRleEncoderImpl::~ByteRleEncoderImpl() {
@@ -203,6 +205,21 @@ namespace orc {
recorder->add(static_cast<uint64_t>(numLiterals));
}
+ void ByteRleEncoderImpl::reset() {
+ numLiterals = 0;
+ tailRunLength = 0;
+ repeat = false;
+ bufferPosition = 0;
+ bufferLength = 0;
+ buffer = nullptr;
+ }
+
+ void ByteRleEncoderImpl::suppress() {
+ // written data can be just ignored because they are only flushed in memory
+ outputStream->suppress();
+ reset();
+ }
+
std::unique_ptr<ByteRleEncoder> createByteRleEncoder
(std::unique_ptr<BufferedOutputStream> output) {
return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index 71ca579..2f6e2eb 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -55,6 +55,11 @@ namespace orc {
* @param recorder use the recorder to record current positions
*/
virtual void recordPosition(PositionRecorder* recorder) const = 0;
+
+ /**
+ * suppress the data and reset to initial state
+ */
+ virtual void suppress() = 0;
};
class ByteRleDecoder {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index fd77d70..559ca1a 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -100,7 +100,8 @@ namespace orc {
enableBloomFilter(false),
memPool(*options.getMemoryPool()),
indexStream(),
- bloomFilterStream() {
+ bloomFilterStream(),
+ hasNullValue(false) {
std::unique_ptr<BufferedOutputStream> presentStream =
factory.createStream(proto::Stream_Kind_PRESENT);
@@ -139,10 +140,22 @@ namespace orc {
uint64_t offset,
uint64_t numValues,
const char* incomingMask) {
- notNullEncoder->add(batch.notNull.data() + offset, numValues,
incomingMask);
+ const char* notNull = batch.notNull.data() + offset;
+ notNullEncoder->add(notNull, numValues, incomingMask);
+ hasNullValue |= batch.hasNulls;
+ for (uint64_t i = 0; !hasNullValue && i < numValues; ++i) {
+ if (!notNull[i]) {
+ hasNullValue = true;
+ }
+ }
}
void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ if (!hasNullValue) {
+ // supress the present stream
+ notNullEncoder->suppress();
+ return;
+ }
proto::Stream stream;
stream.set_kind(proto::Stream_Kind_PRESENT);
stream.set_column(static_cast<uint32_t>(columnId));
@@ -199,6 +212,21 @@ namespace orc {
}
void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+ if (!hasNullValue) {
+ // remove positions of present stream
+ int presentCount = indexStream->isCompressed() ? 4 : 3;
+ for (int i = 0; i != rowIndex->entry_size(); ++i) {
+ proto::RowIndexEntry* entry = rowIndex->mutable_entry(i);
+ std::vector<uint64_t> positions;
+ for (int j = presentCount; j < entry->positions_size(); ++j) {
+ positions.push_back(entry->positions(j));
+ }
+ entry->clear_positions();
+ for (size_t j = 0; j != positions.size(); ++j) {
+ entry->add_positions(positions[j]);
+ }
+ }
+ }
// write row index to output stream
rowIndex->SerializeToZeroCopyStream(indexStream.get());
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index cbbb5d0..2098377 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -207,6 +207,7 @@ namespace orc {
MemoryPool& memPool;
std::unique_ptr<BufferedOutputStream> indexStream;
std::unique_ptr<BufferedOutputStream> bloomFilterStream;
+ bool hasNullValue;
};
/**
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index fd71c4b..8770d28 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -97,6 +97,10 @@ namespace orc {
return dataSize;
}
+ void BufferedOutputStream::suppress() {
+ dataBuffer->resize(0);
+ }
+
void AppendOnlyBufferedStream::write(const char * data, size_t size) {
size_t dataOffset = 0;
while (size > 0) {
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 6d04629..69c06d5 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -62,6 +62,7 @@ namespace orc {
virtual std::string getName() const;
virtual uint64_t getSize() const;
virtual uint64_t flush();
+ virtual void suppress();
virtual bool isCompressed() const { return false; }
};
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index c2e50c1..8df2c22 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -21,6 +21,7 @@
#include "MemoryInputStream.hh"
#include "MemoryOutputStream.hh"
+#include "Reader.hh"
#include "wrap/gmock.h"
#include "wrap/gtest-wrapper.h"
@@ -1901,5 +1902,99 @@ namespace orc {
}
}
+ TEST(WriterTest, testSuppressPresentStream) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ size_t rowCount = 2000;
+ {
+ auto type = std::unique_ptr<Type>(
+ Type::buildTypeFromString("struct<col1:int,col2:int>"));
+ WriterOptions options;
+ options.setStripeSize(1024 * 1024)
+ .setCompressionBlockSize(1024)
+ .setCompression(CompressionKind_NONE)
+ .setMemoryPool(pool)
+ .setRowIndexStride(1000);
+
+ auto writer = createWriter(*type, &memStream, options);
+ auto batch = writer->createRowBatch(rowCount);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& longBatch1 =
dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ auto& longBatch2 =
dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+ structBatch.numElements = rowCount;
+ longBatch1.numElements = rowCount;
+ longBatch2.numElements = rowCount;
+ longBatch1.hasNulls = true;
+ for (size_t i = 0; i < rowCount; ++i) {
+ if (i % 2 == 0) {
+ longBatch1.notNull[i] = 0;
+ } else {
+ longBatch1.notNull[i] = 1;
+ longBatch1.data[i] = static_cast<int64_t>(i*100);
+ }
+ longBatch2.data[i] = static_cast<int64_t>(i*300);
+ }
+ writer->add(*batch);
+ writer->close();
+ }
+ // read file & check the present stream
+ {
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream(memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader =
+ createReader(std::move(inStream), readerOptions);
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ auto batch = rowReader->createRowBatch(1000);
+ EXPECT_TRUE(rowReader->next(*batch));
+ EXPECT_EQ(1000, batch->numElements);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& longBatch1 =
dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ auto& longBatch2 =
dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+ for (size_t i = 0; i < 1000; ++i) {
+ if (i % 2 == 0) {
+ EXPECT_FALSE(longBatch1.notNull[i]);
+ } else {
+ EXPECT_TRUE(longBatch1.notNull[i]);
+ EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i*100));
+ }
+ EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i*300));
+ }
+ // Read rows 1500 - 2000
+ rowReader->seekToRow(1500);
+ EXPECT_TRUE(rowReader->next(*batch));
+ EXPECT_EQ(500, batch->numElements);
+ for (size_t i = 0; i < 500; ++i) {
+ if (i % 2 == 0) {
+ EXPECT_FALSE(longBatch1.notNull[i]);
+ } else {
+ EXPECT_TRUE(longBatch1.notNull[i]);
+ EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>((i + 1500)*100));
+ }
+ EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>((i + 1500)*300));
+ }
+ // fetch StripeFooter from pb stream
+ std::unique_ptr<StripeInformation> stripeInfo = reader->getStripe(0);
+ ReaderImpl* readerImpl = dynamic_cast<ReaderImpl*>(reader.get());
+ std::unique_ptr<SeekableInputStream> pbStream(
+ new SeekableFileInputStream(readerImpl->getStream(),
+ stripeInfo->getOffset() + stripeInfo->getIndexLength() +
stripeInfo->getDataLength(),
+ stripeInfo->getFooterLength(),
+ *pool));
+ proto::StripeFooter stripeFooter;
+ if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
+ throw ParseError("Parse stripe footer from pb stream failed");
+ }
+ for (int i = 0; i < stripeFooter.streams_size(); ++i) {
+ const proto::Stream& stream = stripeFooter.streams(i);
+ if (stream.has_kind() && stream.kind() == proto::Stream_Kind_PRESENT) {
+ EXPECT_EQ(stream.column(), 1UL);
+ }
+ }
+ }
+ }
+
INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(),
FileVersion::v_0_12()));
}