[ 
https://issues.apache.org/jira/browse/DRILL-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048706#comment-17048706
 ] 

ASF GitHub Bot commented on DRILL-7607:
---------------------------------------

paul-rogers commented on pull request #2000: DRILL-7607: support dynamic credit 
based flow control
URL: https://github.com/apache/drill/pull/2000#discussion_r386149418
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 ##########
 @@ -90,14 +100,39 @@ public boolean isEmpty() {
 
     @Override
     public void add(RawFragmentBatch batch) {
+      int recordCount = batch.getHeader().getDef().getRecordCount();
+      long bathByteSize = batch.getByteCount();
+      if (recordCount != 0) {
+        //skip first header batch
+        totalBatchSize += bathByteSize;
+        sampleTimes++;
+      }
+      if (sampleTimes == maxSampleTimes) {
+        long averageBathSize = totalBatchSize / sampleTimes;
+        //make a decision
+        long limit = context.getAllocator().getLimit();
 
 Review comment:
   As it turns out, it is VERY hard to compute the memory used for a batch. The 
"batch sizer" classes use a mechanism that dives below the level of vectors 
into the ledgers. See `ValueVector.void collectLedgers(Set<BufferLedger> 
ledgers)`. This turned out to be the only solution because deserialized data 
shares buffers, as I recall, and counting buffer sizes gives the wrong answer. 
(It's been a couple of years since we we added that code, so I can't remember 
the details.)
   
   There are two better solutions. One would be to carry the data size along 
with each batch. But, even that has the problems described in my earlier note.
   
   The best solution would be to have an RM solution set the target batch size. 
Each operator would work to produce batches of that size. (The scan can do so 
using the Result Set Loader. Other operators can use the "batch sizer" method.) 
That way memory calcs are easier not just for this feature, but also for sorts 
and other buffering operators.
   
   Since adding a target batch size is beyond the scope of this PR, I'd suggest 
adding some config options instead:
   
   * Enable/disable this feature
   * Target memory limit for buffering
   
   That way, until we can control batch sizes, we can at least adjust the 
buffering here to deal with different workloads.
   
   It is probably fine to make these config (boot) options rather than system 
options.
   
   Finally, my (limited) understanding is that spooling for the receiver 
exists, but has never been used. First, it is very complex. Second, it could 
actually slow query execution because of the need to write to, then read from 
disk. Third, we'd need a way to manage maximum disk space used for spooling.
   
   So, given that the code has never been used (I believe), it is not 
surprising that it contains bugs. Thanks for finding them.
   
   Last I checked (my knowledge may be old), only sort, join and hash agg can 
spill to disk well.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Dynamic credit based flow control
> ---------------------------------
>
>                 Key: DRILL-7607
>                 URL: https://issues.apache.org/jira/browse/DRILL-7607
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - RPC
>    Affects Versions: 1.17.0
>            Reporter: Weijie Tong
>            Assignee: Weijie Tong
>            Priority: Major
>             Fix For: 1.18.0
>
>
> Drill current has a static credit based flow control between the batch sender 
> and receiver. That means ,all the sender send out their batch through the 
> DataTunnel by a static 3 semaphore. To the receiver side , there's two cases, 
> the UnlimitedRawBatchBuffer has a 6 * fragmentCount receiver semaphore, the 
> SpoolingRawBatchBuffer acts as having unlimited receiving semaphore as it 
> could flush data to disk.
> The static credit has the following weak points:
> 1. While the send batch data size is low(e.g. it has only one column bigint 
> data) and the receiver has larger memory space, the sender still could not 
> send out its data rapidly.
> 2. As the static credit assumption does not set the semaphore number 
> according to the corresponding receiver memory space, it still have the risk 
> to make the receiver OOM.
> 3. As the sender semaphore is small, it could not send its batch 
> consecutively due to wait for an Ack to release one semaphore , and then , 
> the sender's corresponding execution pipeline would be halt, also the same to 
> its leaf execution nodes. 
> The dynamic credit based flow control could solve these problems. It starts 
> from the static credit flow control. Then the receiver collects some batch 
> datas to calculate the average batch size. According to the receiver side 
> memory space, the receiver make a runtime sender credit and receiver side 
> total credit. The receiver sends out the runtime sender credit number to the 
> sender by the Ack response. The sender change to the runtime sender credit 
> number when receives the Ack response with a runtime credit value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to