This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/clucene by this push:
new 58b692c [Update] add ram balance for SDocumentWriter (#54)
58b692c is described below
commit 58b692c1726a7d1592ec4fad511a5e6f65de09c7
Author: airborne12 <[email protected]>
AuthorDate: Fri Apr 14 14:04:07 2023 +0800
[Update] add ram balance for SDocumentWriter (#54)
---
src/core/CLucene/index/SDocumentWriter.cpp | 95 ++++++++++++++++++++++++++++++
src/core/CLucene/index/SDocumentWriter.h | 36 +++++++++--
2 files changed, 126 insertions(+), 5 deletions(-)
diff --git a/src/core/CLucene/index/SDocumentWriter.cpp
b/src/core/CLucene/index/SDocumentWriter.cpp
index 3b6e66f..57b2d09 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -616,6 +616,8 @@ void
SDocumentsWriter<T>::ThreadState::processDocument(Analyzer *sanalyzer) {
// We process the document one field at a time
for (int32_t i = 0; i < numFields; i++)
fieldDataArray[i]->processField(sanalyzer);
+ if (_parent->ramBufferSize != -1 && _parent->numBytesUsed > 0.95 *
_parent->ramBufferSize)
+ _parent->balanceRAM();
}
template<typename T>
@@ -834,6 +836,7 @@ char *SDocumentsWriter<char>::getSCharBlock() {
char *c;
if (0 == size) {
numBytesAlloc += CHAR_BLOCK_SIZE * SCHAR_NUM_BYTE;
+ balanceRAM();
c = _CL_NEWARRAY(char, CHAR_BLOCK_SIZE);
memset(c, 0, sizeof(char) * CHAR_BLOCK_SIZE);
} else {
@@ -874,6 +877,7 @@ void SDocumentsWriter<T>::getPostings(ValueArray<Posting *>
&postings) {
if (newPostingsAllocCount > this->postingsFreeListDW.length)
this->postingsFreeListDW.resize((int32_t) (1.25 *
newPostingsAllocCount));
+ balanceRAM();
for (size_t i = numToCopy; i < postings.length; i++) {
postings.values[i] = _CLNEW Posting();
numBytesAlloc += POSTING_NUM_BYTE;
@@ -1248,6 +1252,7 @@ uint8_t *SDocumentsWriter<T>::getByteBlock(bool
trackAllocations) {
uint8_t *b;
if (0 == size) {
numBytesAlloc += BYTE_BLOCK_SIZE;
+ balanceRAM();
b = _CL_NEWARRAY(uint8_t, BYTE_BLOCK_SIZE);
memset(b, 0, sizeof(uint8_t) * BYTE_BLOCK_SIZE);
} else {
@@ -1340,6 +1345,96 @@ void SDocumentsWriter<T>::BufferedNorms::fill(int32_t
docID) {
upto = docID;
}
}
+
+template<typename T>
+void SDocumentsWriter<T>::balanceRAM() {
+ if (ramBufferSize == IndexWriter::DISABLE_AUTO_FLUSH || bufferIsFull)
+ return;
+
+ // We free our allocations if we've allocated 5% over
+ // our allowed RAM buffer
+ const int64_t freeTrigger = (int64_t) (1.05 * ramBufferSize);
+ const int64_t freeLevel = (int64_t) (0.95 * ramBufferSize);
+
+ // We flush when we've used our target usage
+ const int64_t flushTrigger = (int64_t) ramBufferSize;
+
+ if (numBytesAlloc > freeTrigger) {
+ if (infoStream != NULL)
+ (*infoStream) << string(" RAM: now balance allocations: usedMB=")
<< toMB(numBytesUsed) + string(" vs trigger=") << toMB(flushTrigger) <<
string(" allocMB=") << toMB(numBytesAlloc) << string(" vs trigger=") <<
toMB(freeTrigger) << string(" postingsFree=") << toMB(this->postingsFreeCountDW
* POSTING_NUM_BYTE) << string(" byteBlockFree=") << toMB(freeByteBlocks.size()
* BYTE_BLOCK_SIZE) << string(" charBlockFree=") << toMB(freeSCharBlocks.size()
* CHAR_BLOCK_SIZE * CHAR_NUM_BY [...]
+
+ // When we've crossed 100% of our target Postings
+ // RAM usage, try to free up until we're back down
+ // to 95%
+ const int64_t startBytesAlloc = numBytesAlloc;
+
+ const int32_t postingsFreeChunk = (int32_t) (BYTE_BLOCK_SIZE /
POSTING_NUM_BYTE);
+
+ int32_t iter = 0;
+
+ // We free equally from each pool in 64 KB
+ // chunks until we are below our threshold
+ // (freeLevel)
+
+ while (numBytesAlloc > freeLevel) {
+ if (0 == freeByteBlocks.size() && 0 == freeSCharBlocks.size() && 0
== this->postingsFreeCountDW) {
+ // Nothing else to free -- must flush now.
+ bufferIsFull = true;
+ if (infoStream != NULL)
+ (*infoStream) << string(" nothing to free; now set
bufferIsFull\n");
+ break;
+ }
+
+ if ((0 == iter % 3) && freeByteBlocks.size() > 0) {
+ freeByteBlocks.remove(freeByteBlocks.size() - 1);
+ numBytesAlloc -= BYTE_BLOCK_SIZE;
+ }
+
+ if ((1 == iter % 3) && freeSCharBlocks.size() > 0) {
+ freeSCharBlocks.remove(freeSCharBlocks.size() - 1);
+ numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
+ }
+
+ if ((2 == iter % 3) && this->postingsFreeCountDW > 0) {
+ int32_t numToFree;
+ if (this->postingsFreeCountDW >= postingsFreeChunk)
+ numToFree = postingsFreeChunk;
+ else
+ numToFree = this->postingsFreeCountDW;
+ for (size_t i = this->postingsFreeCountDW - numToFree; i <
this->postingsFreeCountDW; i++) {
+ _CLDELETE(this->postingsFreeListDW.values[i]);
+ }
+ this->postingsFreeCountDW -= numToFree;
+ this->postingsAllocCountDW -= numToFree;
+ numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
+ }
+
+ iter++;
+ }
+
+ if (infoStream != NULL) {
+ (*infoStream) << " after free: freedMB=" +
Misc::toString((float_t) ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0))
+
+ " usedMB=" + Misc::toString((float_t)
(numBytesUsed / 1024.0 / 1024.0)) +
+ " allocMB=" + Misc::toString((float_t)
(numBytesAlloc / 1024.0 / 1024.0))
+ << string("\n");
+ }
+
+ } else {
+ // If we have not crossed the 100% mark, but have
+ // crossed the 95% mark of RAM we are actually
+ // using, go ahead and flush. This prevents
+ // over-allocating and then freeing, with every
+ // flush.
+ if (numBytesUsed > flushTrigger) {
+ if (infoStream != NULL) {
+ (*infoStream) << string(" RAM: now flush @ usedMB=") <<
Misc::toString((float_t) (numBytesUsed / 1024.0 / 1024.0)) << string("
allocMB=") << Misc::toString((float_t) (numBytesAlloc / 1024.0 / 1024.0)) <<
string(" triggerMB=") << Misc::toString((float_t) (flushTrigger / 1024.0 /
1024.0)) << string("\n");
+ }
+
+ bufferIsFull = true;
+ }
+ }
+}
+
template class SDocumentsWriter<char>;
template class SDocumentsWriter<TCHAR>;
CL_NS_END
\ No newline at end of file
diff --git a/src/core/CLucene/index/SDocumentWriter.h
b/src/core/CLucene/index/SDocumentWriter.h
index ebb14b3..1e43faa 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -50,6 +50,8 @@ private:
bool closed{};
std::string segment;// Current segment we are working on
std::vector<uint32_t> docDeltaBuffer;
+ std::ostream* infoStream{};
+ int64_t ramBufferSize;
public:
class FieldMergeState;
@@ -658,12 +660,14 @@ public:
this->closed = this->flushPending = false;
this->threadState = nullptr;
+ this->infoStream = nullptr;
fieldInfos = _CLNEW FieldInfos();
this->closed = this->flushPending = false;
postingsFreeCountDW = postingsAllocCountDW = 0;
docStoreOffset = nextDocID = numDocsInRAM = numDocsInStore =
nextWriteDocID = 0;
+ ramBufferSize = (int64_t) (256 * 1024 * 1024);
}
virtual ~SDocumentsWriter();
int32_t flush(bool closeDocStore) override ;
@@ -698,6 +702,7 @@ public:
bufferIsFull = false;
flushPending = false;
threadState->numThreads = 0;
+ balanceRAM();
threadState->resetPostings();
numBytesUsed = 0;
}
@@ -712,9 +717,7 @@ public:
int32_t getMaxBufferedDocs() override {
return 0;
}
- float_t getRAMBufferSizeMB() override {
- return 0.0f;
- }
+
int32_t getNumDocsInRAM() override {
return numDocsInRAM;
}
@@ -731,8 +734,25 @@ public:
}
void abort(AbortException *ae) override {}
void setMaxBufferedDocs(int32_t count) override {}
- void setInfoStream(std::ostream *infoStream) override {}
- void setRAMBufferSizeMB(float_t mb) override {}
+ void setInfoStream(std::ostream *is) override {
+ this->infoStream = infoStream;
+ }
+ void setRAMBufferSizeMB(float_t mb) override {
+ if ((int32_t) mb == -1) {
+ ramBufferSize = -1;
+ } else {
+ ramBufferSize = (int64_t) (mb * 1024 * 1024);
+ }
+ }
+
+ float_t getRAMBufferSizeMB() override {
+ if (ramBufferSize == -1) {
+ return (float_t) ramBufferSize;
+ } else {
+ return ramBufferSize / 1024.0 / 1024.0;
+ }
+ }
+
void close() override {}
const std::vector<std::string>& files() override {
if (_files != nullptr)
@@ -740,6 +760,12 @@ public:
_files = _CLNEW std::vector<string>;
return *_files;
}
+ std::string toMB(int64_t v) {
+ char buf[40];
+ cl_sprintf(buf, 40, "%0.2f", v / 1024.0 / 1024.0);
+ return string(buf);
+ }
+ void balanceRAM();
void setMaxBufferedDeleteTerms(int32_t _maxBufferedDeleteTerms) override
{_CLTHROW_NOT_IMPLEMENT}
int32_t getMaxBufferedDeleteTerms() override {_CLTHROW_NOT_IMPLEMENT}
std::string closeDocStore() override {_CLTHROW_NOT_IMPLEMENT}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]