This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push:
new 13ada78b494 [fix] Fix complex type late materializtion. (#311)
13ada78b494 is described below
commit 13ada78b494133cacc0ccb5120e3f4611828fdbb
Author: Qi Chen <[email protected]>
AuthorDate: Mon Apr 28 12:23:22 2025 +0800
[fix] Fix complex type late materializtion. (#311)
---
c++/include/orc/Type.hh | 46 ++++++++++++++++++++++++++++++++++++++++++++--
c++/src/ColumnReader.cc | 21 ++++++++++++++++++---
c++/src/Reader.cc | 16 ++++++++++++----
c++/src/TypeImpl.cc | 4 ++--
4 files changed, 76 insertions(+), 11 deletions(-)
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 1f0901e31de..a8efe1c9815 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -26,15 +26,57 @@
#include "orc/orc-config.hh"
namespace orc {
+ /**
+ * Filter Node Types:
+ *
+ * FILTER_CHILD: Primitive type that is a filter column.
+ * FILTER_PARENT: Compound type that may contain both filter and non-filter
children.
+ * FILTER_COMPOUND_ELEMENT: Compound type that is a filter element
(list/map).
+ * The entire column will be read, and must have
only filter children.
+ * NON_FILTER: Non-filter column.
+ *
+ * Example:
+ * struct<name:string,
+ * age:int,
+ * address:struct<city:string,
+ * zip:int,
+ * location:struct<latitude:double, longitude:double>>,
+ * hobbies:list<struct<name:string, level:int>>,
+ * scores:map<string, struct<subject:string, grade:int>>>
+ *
+ * Filter columns: name, address.city, address.location.latitude, hobbies,
scores
+ *
+ * Column Structure:
+ * struct<...>
+ * ├── name (FILTER_CHILD) # Primitive type, filter column
+ * ├── age (NON_FILTER) # Non-filter column
+ * ├── address (FILTER_PARENT) # Compound type with filter children
+ * │ ├── city (FILTER_CHILD) # Primitive type, filter column
+ * │ ├── zip (NON_FILTER) # Non-filter column
+ * │ └── location (FILTER_PARENT) # Compound type with filter children
+ * │ ├── latitude (FILTER_CHILD) # Primitive type, filter column
+ * │ └── longitude (NON_FILTER) # Non-filter column
+ * ├── hobbies (FILTER_COMPOUND_ELEMENT) # Compound type as filter element
(list)
+ * │ └── struct<name:string, level:int> (FILTER_PARENT) # Compound type
with filter children
+ * │ ├── name (FILTER_CHILD) # Primitive type, filter column
+ * │ └── level (FILTER_CHILD) # Primitive type, filter column
+ * └── scores (FILTER_COMPOUND_ELEMENT) # Compound type as filter element
(map)
+ * ├── key (FILTER_CHILD) # Primitive type, filter column
+ * └── value (FILTER_PARENT) # Compound type with filter children
+ * ├── subject (FILTER_CHILD) # Primitive type, filter column
+ * └── grade (FILTER_CHILD) # Primitive type, filter column
+ */
enum class ReaderCategory {
FILTER_CHILD, // Primitive type that is a filter column
- FILTER_PARENT, // Compound type with filter children
+ FILTER_PARENT, // Compound type that may contain both filter and
non-filter children
+ FILTER_COMPOUND_ELEMENT, // Compound type that is a filter element
(list/map).
+ // The entire column will be read, and must have
only filter children.
NON_FILTER // Non-filter column
};
class ReadPhase {
public:
- static const int NUM_CATEGORIES = 3; // Number of values in ReaderCategory
+ static const int NUM_CATEGORIES = 4; // Number of values in ReaderCategory
std::bitset<NUM_CATEGORIES> categories;
static const ReadPhase ALL;
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 875ce81a9de..4dde99917c0 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -1158,7 +1158,9 @@ namespace orc {
}
uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
- numValues = ColumnReader::skip(numValues, readPhase);
+ if (readPhase.contains(this->type.getReaderCategory())) {
+ numValues = ColumnReader::skip(numValues, readPhase);
+ }
for (auto& ptr : children) {
if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
ptr->skip(numValues, readPhase);
@@ -1183,7 +1185,9 @@ namespace orc {
void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
char* notNull, const ReadPhase&
readPhase,
uint16_t* sel_rowid_idx, size_t
sel_size) {
- ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
+ if (readPhase.contains(this->type.getReaderCategory())) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
uint64_t i = 0;
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
@@ -1201,7 +1205,9 @@ namespace orc {
void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
const ReadPhase& readPhase) {
- ColumnReader::seekToRowGroup(positions, readPhase);
+ if (readPhase.contains(this->type.getReaderCategory())) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
+ }
for (auto& ptr : children) {
if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
@@ -1579,6 +1585,9 @@ namespace orc {
}
uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ if (!readPhase.contains(this->type.getReaderCategory())) {
+ throw NotImplementedYet("Not implemented yet");
+ }
numValues = ColumnReader::skip(numValues, readPhase);
const uint64_t BUFFER_SIZE = 1024;
char buffer[BUFFER_SIZE];
@@ -1618,6 +1627,9 @@ namespace orc {
void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
char* notNull, const ReadPhase&
readPhase,
uint16_t* sel_rowid_idx, size_t
sel_size) {
+ if (!readPhase.contains(this->type.getReaderCategory())) {
+ throw NotImplementedYet("Not implemented yet");
+ }
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
uint64_t* offsets = unionBatch.offsets.data();
@@ -1655,6 +1667,9 @@ namespace orc {
void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
const ReadPhase& readPhase) {
+ if (!readPhase.contains(this->type.getReaderCategory())) {
+ throw NotImplementedYet("Not implemented yet");
+ }
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
for (size_t i = 0; i < numChildren; ++i) {
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 2cfd6417c5b..0f4cf4d740c 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -337,6 +337,9 @@ namespace orc {
while (current != nullptr) {
if (current->getSubtypeCount() == 0) {
current->setReaderCategory(ReaderCategory::FILTER_CHILD);
+ } else if (current->getKind() == TypeKind::LIST
+ || current->getKind() == TypeKind::MAP) {
+
current->setReaderCategory(ReaderCategory::FILTER_COMPOUND_ELEMENT);
} else {
current->setReaderCategory(ReaderCategory::FILTER_PARENT);
}
@@ -345,7 +348,7 @@ namespace orc {
}
// Process all child nodes of the current node
- // For child nodes: set FILTER_PARENT if it's a leaf, FILTER_CHILD if
it has children
+ // For child nodes: set FILTER_CHILD if it's a leaf, FILTER_PARENT if
it has children
std::function<void(Type*)> processChildren = [&processChildren](Type*
node) {
if (node == nullptr) return;
@@ -354,16 +357,20 @@ namespace orc {
Type* child = node->getSubtype(i);
if (child->getSubtypeCount() == 0) {
// Leaf node (no children)
- child->setReaderCategory(ReaderCategory::FILTER_PARENT);
+ child->setReaderCategory(ReaderCategory::FILTER_CHILD);
+ } else if (child->getKind() == TypeKind::LIST
+ || child->getKind() == TypeKind::MAP) {
+
child->setReaderCategory(ReaderCategory::FILTER_COMPOUND_ELEMENT);
+ // Recursively process its children
+ processChildren(child);
} else {
// Non-leaf node (has children)
- child->setReaderCategory(ReaderCategory::FILTER_CHILD);
+ child->setReaderCategory(ReaderCategory::FILTER_PARENT);
// Recursively process its children
processChildren(child);
}
}
};
-
processChildren(type);
}
@@ -1341,6 +1348,7 @@ namespace orc {
prepareFollowReaders(footer->rowindexstride(),
currentRowInStripe, followRowInStripe),
sel_rowid_idx.get(), arg);
followRowInStripe = currentRowInStripe + rowsToRead;
+ data.numElements = rowsToRead;
}
} else {
nextBatch(data, rowsToRead, startReadPhase, nullptr, arg);
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index a5708951929..a85e32bcd1f 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -26,9 +26,9 @@
namespace orc {
const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories(
- {ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER});
+ {ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT,
ReaderCategory::FILTER_COMPOUND_ELEMENT, ReaderCategory::NON_FILTER});
const ReadPhase ReadPhase::LEADERS =
- ReadPhase::fromCategories({ReaderCategory::FILTER_CHILD,
ReaderCategory::FILTER_PARENT});
+ ReadPhase::fromCategories({ReaderCategory::FILTER_CHILD,
ReaderCategory::FILTER_COMPOUND_ELEMENT, ReaderCategory::FILTER_PARENT});
const ReadPhase ReadPhase::FOLLOWERS =
ReadPhase::fromCategories({ReaderCategory::NON_FILTER});
const ReadPhase ReadPhase::LEADER_PARENTS =
ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]