This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new d1ac8f1 ORC-1137: [C++] Unroll loops and copy data directly in
DoubleColumnReader::next()
d1ac8f1 is described below
commit d1ac8f165ace0270ff96a8eece3de54312ff554d
Author: Quanlong Huang <[email protected]>
AuthorDate: Thu Mar 31 10:24:30 2022 +0800
ORC-1137: [C++] Unroll loops and copy data directly in
DoubleColumnReader::next()
This closes #1071
---
c++/src/ColumnReader.cc | 121 ++++++++++++++++++++++++++++++++++--------------
1 file changed, 87 insertions(+), 34 deletions(-)
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index c3a4b45..f4a4df9 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -412,10 +412,11 @@ namespace orc {
nanoRle->seek(positions.at(columnId));
}
+ template<TypeKind columnKind, bool isLittleEndian>
class DoubleColumnReader: public ColumnReader {
public:
DoubleColumnReader(const Type& type, StripeStreams& stripe);
- ~DoubleColumnReader() override;
+ ~DoubleColumnReader() override {}
uint64_t skip(uint64_t numValues) override;
@@ -428,8 +429,7 @@ namespace orc {
private:
std::unique_ptr<SeekableInputStream> inputStream;
- TypeKind columnKind;
- const uint64_t bytesPerValue ;
+ const uint64_t bytesPerValue = (columnKind == FLOAT) ? 4 : 8;
const char *bufferPointer;
const char *bufferEnd;
@@ -447,8 +447,24 @@ namespace orc {
double readDouble() {
int64_t bits = 0;
- for (uint64_t i=0; i < 8; i++) {
- bits |= static_cast<int64_t>(readByte()) << (i*8);
+ if (bufferEnd - bufferPointer >= 8) {
+ if (isLittleEndian) {
+ bits = *(reinterpret_cast<const int64_t*>(bufferPointer));
+ } else {
+ bits = static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[0]));
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[1])) << 8;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[2])) << 16;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[3])) << 24;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[4])) << 32;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[5])) << 40;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[6])) << 48;
+ bits |= static_cast<int64_t>(static_cast<unsigned
char>(bufferPointer[7])) << 56;
+ }
+ bufferPointer += 8;
+ } else {
+ for (uint64_t i = 0; i < 8; i++) {
+ bits |= static_cast<int64_t>(readByte()) << (i * 8);
+ }
}
double *result = reinterpret_cast<double*>(&bits);
return *result;
@@ -456,32 +472,40 @@ namespace orc {
double readFloat() {
int32_t bits = 0;
- for (uint64_t i=0; i < 4; i++) {
- bits |= readByte() << (i*8);
+ if (bufferEnd - bufferPointer >= 4) {
+ if (isLittleEndian) {
+ bits = *(reinterpret_cast<const int32_t*>(bufferPointer));
+ } else {
+ bits = static_cast<unsigned char>(bufferPointer[0]);
+ bits |= static_cast<unsigned char>(bufferPointer[1]) << 8;
+ bits |= static_cast<unsigned char>(bufferPointer[2]) << 16;
+ bits |= static_cast<unsigned char>(bufferPointer[3]) << 24;
+ }
+ bufferPointer += 4;
+ } else {
+ for (uint64_t i = 0; i < 4; i++) {
+ bits |= readByte() << (i * 8);
+ }
}
float *result = reinterpret_cast<float*>(&bits);
return static_cast<double>(*result);
}
};
- DoubleColumnReader::DoubleColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe),
- columnKind(type.getKind()),
- bytesPerValue((type.getKind() ==
- FLOAT) ? 4 : 8),
- bufferPointer(nullptr),
- bufferEnd(nullptr) {
+ template<TypeKind columnKind, bool isLittleEndian>
+ DoubleColumnReader<columnKind, isLittleEndian>::DoubleColumnReader(
+ const Type& type,
+ StripeStreams& stripe
+ ): ColumnReader(type, stripe),
+ bufferPointer(nullptr),
+ bufferEnd(nullptr) {
inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (inputStream == nullptr)
throw ParseError("DATA stream not found in Double column");
}
- DoubleColumnReader::~DoubleColumnReader() {
- // PASS
- }
-
- uint64_t DoubleColumnReader::skip(uint64_t numValues) {
+ template<TypeKind columnKind, bool isLittleEndian>
+ uint64_t DoubleColumnReader<columnKind, isLittleEndian>::skip(uint64_t
numValues) {
numValues = ColumnReader::skip(numValues);
if (static_cast<size_t>(bufferEnd - bufferPointer) >=
@@ -503,9 +527,11 @@ namespace orc {
return numValues;
}
- void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
+ template<TypeKind columnKind, bool isLittleEndian>
+ void DoubleColumnReader<columnKind, isLittleEndian>::next(
+ ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
@@ -531,13 +557,33 @@ namespace orc {
}
}
} else {
- for(size_t i=0; i < numValues; ++i) {
+ // Number of values in the buffer that we can copy directly.
+ // Only viable when the machine is little-endian.
+ uint64_t bufferNum = 0;
+ if (isLittleEndian) {
+ bufferNum = std::min(numValues,
+ static_cast<size_t>(bufferEnd - bufferPointer) / bytesPerValue);
+ uint64_t bufferBytes = bufferNum * bytesPerValue;
+ memcpy(outArray, bufferPointer, bufferBytes);
+ bufferPointer += bufferBytes;
+ }
+ for (size_t i = bufferNum; i < numValues; ++i) {
outArray[i] = readDouble();
}
}
}
}
+ template<TypeKind columnKind, bool isLittleEndian>
+ void DoubleColumnReader<columnKind, isLittleEndian>::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ inputStream->seek(positions.at(columnId));
+ // clear buffer state after seek
+ bufferEnd = nullptr;
+ bufferPointer = nullptr;
+ }
+
void readFully(char* buffer, int64_t bufferSize, SeekableInputStream*
stream) {
int64_t posn = 0;
while (posn < bufferSize) {
@@ -554,15 +600,6 @@ namespace orc {
}
}
- void DoubleColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
- inputStream->seek(positions.at(columnId));
- // clear buffer state after seek
- bufferEnd = nullptr;
- bufferPointer = nullptr;
- }
-
class StringDictionaryColumnReader: public ColumnReader {
private:
std::shared_ptr<StringDictionary> dictionary;
@@ -1824,6 +1861,11 @@ namespace orc {
}
}
+ static bool isLittleEndian() {
+ static union { uint32_t i; char c[4]; } num = { 0x01020304 };
+ return num.c[0] == 4;
+ }
+
/**
* Create a reader for the given stripe.
*/
@@ -1878,9 +1920,20 @@ namespace orc {
new StructColumnReader(type, stripe));
case FLOAT:
+ if (isLittleEndian()) {
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, true>(type, stripe));
+ }
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, false>(type, stripe));
+
case DOUBLE:
+ if (isLittleEndian()) {
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<DOUBLE, true>(type, stripe));
+ }
return std::unique_ptr<ColumnReader>(
- new DoubleColumnReader(type, stripe));
+ new DoubleColumnReader<DOUBLE, false>(type, stripe));
case TIMESTAMP:
return std::unique_ptr<ColumnReader>