This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/clucene by this push:
     new 58b692c  [Update] add ram balance for SDocumentWriter (#54)
58b692c is described below

commit 58b692c1726a7d1592ec4fad511a5e6f65de09c7
Author: airborne12 <[email protected]>
AuthorDate: Fri Apr 14 14:04:07 2023 +0800

    [Update] add ram balance for SDocumentWriter (#54)
---
 src/core/CLucene/index/SDocumentWriter.cpp | 95 ++++++++++++++++++++++++++++++
 src/core/CLucene/index/SDocumentWriter.h   | 36 +++++++++--
 2 files changed, 126 insertions(+), 5 deletions(-)

diff --git a/src/core/CLucene/index/SDocumentWriter.cpp 
b/src/core/CLucene/index/SDocumentWriter.cpp
index 3b6e66f..57b2d09 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -616,6 +616,8 @@ void 
SDocumentsWriter<T>::ThreadState::processDocument(Analyzer *sanalyzer) {
     // We process the document one field at a time
     for (int32_t i = 0; i < numFields; i++)
         fieldDataArray[i]->processField(sanalyzer);
+    if (_parent->ramBufferSize != -1 && _parent->numBytesUsed > 0.95 * 
_parent->ramBufferSize)
+        _parent->balanceRAM();
 }
 
 template<typename T>
@@ -834,6 +836,7 @@ char *SDocumentsWriter<char>::getSCharBlock() {
     char *c;
     if (0 == size) {
         numBytesAlloc += CHAR_BLOCK_SIZE * SCHAR_NUM_BYTE;
+        balanceRAM();
         c = _CL_NEWARRAY(char, CHAR_BLOCK_SIZE);
         memset(c, 0, sizeof(char) * CHAR_BLOCK_SIZE);
     } else {
@@ -874,6 +877,7 @@ void SDocumentsWriter<T>::getPostings(ValueArray<Posting *> 
&postings) {
         if (newPostingsAllocCount > this->postingsFreeListDW.length)
             this->postingsFreeListDW.resize((int32_t) (1.25 * 
newPostingsAllocCount));
 
+        balanceRAM();
         for (size_t i = numToCopy; i < postings.length; i++) {
             postings.values[i] = _CLNEW Posting();
             numBytesAlloc += POSTING_NUM_BYTE;
@@ -1248,6 +1252,7 @@ uint8_t *SDocumentsWriter<T>::getByteBlock(bool 
trackAllocations) {
     uint8_t *b;
     if (0 == size) {
         numBytesAlloc += BYTE_BLOCK_SIZE;
+        balanceRAM();
         b = _CL_NEWARRAY(uint8_t, BYTE_BLOCK_SIZE);
         memset(b, 0, sizeof(uint8_t) * BYTE_BLOCK_SIZE);
     } else {
@@ -1340,6 +1345,96 @@ void SDocumentsWriter<T>::BufferedNorms::fill(int32_t 
docID) {
         upto = docID;
     }
 }
+
+template<typename T>
+void SDocumentsWriter<T>::balanceRAM() {
+    if (ramBufferSize == IndexWriter::DISABLE_AUTO_FLUSH || bufferIsFull)
+        return;
+
+    // We free our allocations if we've allocated 5% over
+    // our allowed RAM buffer
+    const int64_t freeTrigger = (int64_t) (1.05 * ramBufferSize);
+    const int64_t freeLevel = (int64_t) (0.95 * ramBufferSize);
+
+    // We flush when we've used our target usage
+    const int64_t flushTrigger = (int64_t) ramBufferSize;
+
+    if (numBytesAlloc > freeTrigger) {
+        if (infoStream != NULL)
+            (*infoStream) << string("  RAM: now balance allocations: usedMB=") 
<< toMB(numBytesUsed) + string(" vs trigger=") << toMB(flushTrigger) << 
string(" allocMB=") << toMB(numBytesAlloc) << string(" vs trigger=") << 
toMB(freeTrigger) << string(" postingsFree=") << toMB(this->postingsFreeCountDW 
* POSTING_NUM_BYTE) << string(" byteBlockFree=") << toMB(freeByteBlocks.size() 
* BYTE_BLOCK_SIZE) << string(" charBlockFree=") << toMB(freeSCharBlocks.size() 
* CHAR_BLOCK_SIZE * CHAR_NUM_BY [...]
+
+        // When we've crossed 100% of our target Postings
+        // RAM usage, try to free up until we're back down
+        // to 95%
+        const int64_t startBytesAlloc = numBytesAlloc;
+
+        const int32_t postingsFreeChunk = (int32_t) (BYTE_BLOCK_SIZE / 
POSTING_NUM_BYTE);
+
+        int32_t iter = 0;
+
+        // We free equally from each pool in 64 KB
+        // chunks until we are below our threshold
+        // (freeLevel)
+
+        while (numBytesAlloc > freeLevel) {
+            if (0 == freeByteBlocks.size() && 0 == freeSCharBlocks.size() && 0 
== this->postingsFreeCountDW) {
+                // Nothing else to free -- must flush now.
+                bufferIsFull = true;
+                if (infoStream != NULL)
+                    (*infoStream) << string("    nothing to free; now set 
bufferIsFull\n");
+                break;
+            }
+
+            if ((0 == iter % 3) && freeByteBlocks.size() > 0) {
+                freeByteBlocks.remove(freeByteBlocks.size() - 1);
+                numBytesAlloc -= BYTE_BLOCK_SIZE;
+            }
+
+            if ((1 == iter % 3) && freeSCharBlocks.size() > 0) {
+                freeSCharBlocks.remove(freeSCharBlocks.size() - 1);
+                numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
+            }
+
+            if ((2 == iter % 3) && this->postingsFreeCountDW > 0) {
+                int32_t numToFree;
+                if (this->postingsFreeCountDW >= postingsFreeChunk)
+                    numToFree = postingsFreeChunk;
+                else
+                    numToFree = this->postingsFreeCountDW;
+                for (size_t i = this->postingsFreeCountDW - numToFree; i < 
this->postingsFreeCountDW; i++) {
+                    _CLDELETE(this->postingsFreeListDW.values[i]);
+                }
+                this->postingsFreeCountDW -= numToFree;
+                this->postingsAllocCountDW -= numToFree;
+                numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
+            }
+
+            iter++;
+        }
+
+        if (infoStream != NULL) {
+            (*infoStream) << "    after free: freedMB=" + 
Misc::toString((float_t) ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0)) 
+
+                                     " usedMB=" + Misc::toString((float_t) 
(numBytesUsed / 1024.0 / 1024.0)) +
+                                     " allocMB=" + Misc::toString((float_t) 
(numBytesAlloc / 1024.0 / 1024.0))
+                          << string("\n");
+        }
+
+    } else {
+        // If we have not crossed the 100% mark, but have
+        // crossed the 95% mark of RAM we are actually
+        // using, go ahead and flush.  This prevents
+        // over-allocating and then freeing, with every
+        // flush.
+        if (numBytesUsed > flushTrigger) {
+            if (infoStream != NULL) {
+                (*infoStream) << string("  RAM: now flush @ usedMB=") << 
Misc::toString((float_t) (numBytesUsed / 1024.0 / 1024.0)) << string(" 
allocMB=") << Misc::toString((float_t) (numBytesAlloc / 1024.0 / 1024.0)) << 
string(" triggerMB=") << Misc::toString((float_t) (flushTrigger / 1024.0 / 
1024.0)) << string("\n");
+            }
+
+            bufferIsFull = true;
+        }
+    }
+}
+
 template class SDocumentsWriter<char>;
 template class SDocumentsWriter<TCHAR>;
 CL_NS_END
\ No newline at end of file
diff --git a/src/core/CLucene/index/SDocumentWriter.h 
b/src/core/CLucene/index/SDocumentWriter.h
index ebb14b3..1e43faa 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -50,6 +50,8 @@ private:
     bool closed{};
     std::string segment;// Current segment we are working on
     std::vector<uint32_t> docDeltaBuffer;
+    std::ostream* infoStream{};
+    int64_t ramBufferSize;
 
 public:
     class FieldMergeState;
@@ -658,12 +660,14 @@ public:
 
         this->closed = this->flushPending = false;
         this->threadState = nullptr;
+        this->infoStream = nullptr;
         fieldInfos = _CLNEW FieldInfos();
 
 
         this->closed = this->flushPending = false;
         postingsFreeCountDW = postingsAllocCountDW = 0;
         docStoreOffset = nextDocID = numDocsInRAM = numDocsInStore = 
nextWriteDocID = 0;
+        ramBufferSize = (int64_t) (256 * 1024 * 1024);
     }
     virtual ~SDocumentsWriter();
     int32_t flush(bool closeDocStore) override ;
@@ -698,6 +702,7 @@ public:
         bufferIsFull = false;
         flushPending = false;
         threadState->numThreads = 0;
+        balanceRAM();
         threadState->resetPostings();
         numBytesUsed = 0;
     }
@@ -712,9 +717,7 @@ public:
     int32_t getMaxBufferedDocs() override {
         return 0;
     }
-    float_t getRAMBufferSizeMB() override {
-        return 0.0f;
-    }
+
     int32_t getNumDocsInRAM() override {
         return numDocsInRAM;
     }
@@ -731,8 +734,25 @@ public:
     }
     void abort(AbortException *ae) override {}
     void setMaxBufferedDocs(int32_t count) override {}
-    void setInfoStream(std::ostream *infoStream) override {}
-    void setRAMBufferSizeMB(float_t mb) override {}
+    void setInfoStream(std::ostream *is) override {
+        this->infoStream = infoStream;
+    }
+    void setRAMBufferSizeMB(float_t mb) override {
+        if ((int32_t) mb == -1) {
+            ramBufferSize = -1;
+        } else {
+            ramBufferSize = (int64_t) (mb * 1024 * 1024);
+        }
+    }
+
+    float_t getRAMBufferSizeMB() override {
+        if (ramBufferSize == -1) {
+            return (float_t) ramBufferSize;
+        } else {
+            return ramBufferSize / 1024.0 / 1024.0;
+        }
+    }
+
     void close() override {}
     const std::vector<std::string>& files() override {
         if (_files != nullptr)
@@ -740,6 +760,12 @@ public:
         _files = _CLNEW std::vector<string>;
         return *_files;
     }
+    std::string toMB(int64_t v) {
+        char buf[40];
+        cl_sprintf(buf, 40, "%0.2f", v / 1024.0 / 1024.0);
+        return string(buf);
+    }
+    void balanceRAM();
     void setMaxBufferedDeleteTerms(int32_t _maxBufferedDeleteTerms) override 
{_CLTHROW_NOT_IMPLEMENT}
     int32_t getMaxBufferedDeleteTerms() override {_CLTHROW_NOT_IMPLEMENT}
     std::string closeDocStore() override {_CLTHROW_NOT_IMPLEMENT}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to