This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push:
new db01184f765 [opt] support orc merge multi stripes io (#244)
db01184f765 is described below
commit db01184f765c03496e4107bd3ac37c077ac4bc5f
Author: daidai <[email protected]>
AuthorDate: Fri Oct 18 23:17:33 2024 +0800
[opt] support orc merge multi stripes io (#244)
---
c++/include/orc/Reader.hh | 8 +++++-
c++/src/Reader.cc | 71 +++++++++++++++++++++++++++++++++++++----------
c++/src/Reader.hh | 9 +++++-
3 files changed, 71 insertions(+), 17 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 5843d88c059..c9be47e0d8b 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -40,7 +40,7 @@ namespace orc {
// classes that hold data members so we can maintain binary compatibility
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
-
+ class InputStream;
/**
* Expose the reader metrics including the latency and
* number of calls of the decompression/decoding/IO modules.
@@ -633,6 +633,12 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;
+
+ virtual InputStream* getStream() const = 0;
+
+ virtual void setStream(std::unique_ptr<InputStream>) = 0;
+
+ virtual std::vector<int> getNeedReadStripes(const RowReaderOptions& opts)
= 0;
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 80a5cfd4b7d..eddeeee0b05 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -306,7 +306,8 @@ namespace orc {
column_selector.updateSelected(selectedColumns, opts);
// prepare SargsApplier if SearchArgument is available
- if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
+ sargsApplier = std::move(contents->sargsApplier);
+ if (sargsApplier == nullptr && opts.getSearchArgument() &&
footer->rowindexstride() > 0) {
sargs = opts.getSearchArgument();
sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(),
footer->rowindexstride(),
getWriterVersionImpl(_contents.get()),
@@ -917,6 +918,37 @@ namespace orc {
return std::make_unique<RowReaderImpl>(contents, opts, filter,
stringDictFilter);
}
+ std::vector<int> ReaderImpl::getNeedReadStripes(const RowReaderOptions&
opts) {
+ if (opts.getSearchArgument() && !isMetadataLoaded) {
+ // load stripe statistics for PPD
+ readMetadata();
+ }
+
+ std::vector<int> allStripesNeeded(numberOfStripes,1);
+
+ if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
+ auto sargs = opts.getSearchArgument();
+ sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(),
footer->rowindexstride(),
+ getWriterVersionImpl(contents.get()),
+ contents->readerMetrics));
+
+ if (sargsApplier == nullptr || contents->metadata == nullptr) {
+ return allStripesNeeded;
+ }
+
+ for ( uint64_t currentStripeIndex = 0;currentStripeIndex <
numberOfStripes ; currentStripeIndex ++) {
+ const auto& currentStripeStats =
+
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
+ //Not need add mMetrics,so use 0.
+ allStripesNeeded[currentStripeIndex] =
sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
+ }
+ contents->sargsApplier = std::move(sargsApplier);
+ }
+ return allStripesNeeded;
+ }
+
+
+
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
@@ -1126,29 +1158,38 @@ namespace orc {
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
}
- currentStripeFooter = getStripeFooter(currentStripeInfo,
*contents.get());
- rowsInCurrentStripe = currentStripeInfo.numberofrows();
- processingStripe = currentStripe;
-
- std::unique_ptr<StripeInformation> currentStripeInformation(new
StripeInformationImpl(
- currentStripeInfo.offset(), currentStripeInfo.indexlength(),
- currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
- currentStripeInfo.numberofrows(), contents->stream.get(),
*contents->pool,
- contents->compression, contents->blockSize,
contents->readerMetrics));
- contents->stream->beforeReadStripe(std::move(currentStripeInformation),
selectedColumns);
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
- const auto& currentStripeStats =
- contents->metadata->stripestats(static_cast<int>(currentStripe));
+ const auto ¤tStripeStats =
+
contents->metadata->stripestats(static_cast<int>(currentStripe));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
- (rowsInCurrentStripe + footer->rowindexstride() - 1) /
footer->rowindexstride();
+ (rowsInCurrentStripe + footer->rowindexstride() - 1) /
footer->rowindexstride();
isStripeNeeded =
- sargsApplier->evaluateStripeStatistics(currentStripeStats,
stripeRowGroupCount);
+ sargsApplier->evaluateStripeStatistics(currentStripeStats,
stripeRowGroupCount);
}
+ if (!isStripeNeeded) {
+ // advance to next stripe when current stripe has no matching rows
+ currentStripe += 1;
+ currentRowInStripe = 0;
+ continue;
+ }
+ }
+ currentStripeFooter = getStripeFooter(currentStripeInfo,
*contents.get());
+ rowsInCurrentStripe = currentStripeInfo.numberofrows();
+ processingStripe = currentStripe;
+ std::unique_ptr<StripeInformation> currentStripeInformation(new
StripeInformationImpl(
+ currentStripeInfo.offset(), currentStripeInfo.indexlength(),
+ currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
+ currentStripeInfo.numberofrows(), contents->stream.get(),
*contents->pool,
+ contents->compression, contents->blockSize,
contents->readerMetrics));
+ contents->stream->beforeReadStripe(std::move(currentStripeInformation),
selectedColumns);
+
+ if (sargsApplier) {
+ bool isStripeNeeded = true;
if (isStripeNeeded) {
// read row group statistics and bloom filters of current stripe
loadStripeIndex();
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index c0f891ef27c..9505022c558 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -100,6 +100,7 @@ namespace orc {
bool isDecimalAsLong;
std::unique_ptr<proto::Metadata> metadata;
ReaderMetrics* readerMetrics;
+ std::unique_ptr<SargsApplier> sargsApplier;
};
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -314,6 +315,8 @@ namespace orc {
// footer
proto::Footer* footer;
uint64_t numberOfStripes;
+ std::unique_ptr<SargsApplier> sargsApplier;
+ std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) override;
uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
// internal methods
@@ -418,10 +421,14 @@ namespace orc {
return contents->schema.get();
}
- InputStream* getStream() const {
+ InputStream* getStream() const override {
return contents->stream.get();
}
+ void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
+ contents->stream = std::move(inputStreamUPtr);
+ }
+
uint64_t getMemoryUse(int stripeIx = -1) override;
uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int
stripeIx = -1) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]