[
https://issues.apache.org/jira/browse/DRILL-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048214#comment-17048214
]
ASF GitHub Bot commented on DRILL-7607:
---------------------------------------
weijietong commented on pull request #2000: DRILL-7607: support dynamic credit
based flow control
URL: https://github.com/apache/drill/pull/2000#discussion_r386006585
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
##########
@@ -90,14 +100,39 @@ public boolean isEmpty() {
@Override
public void add(RawFragmentBatch batch) {
+ int recordCount = batch.getHeader().getDef().getRecordCount();
+ long bathByteSize = batch.getByteCount();
+ if (recordCount != 0) {
+ //skip first header batch
+ totalBatchSize += bathByteSize;
+ sampleTimes++;
+ }
+ if (sampleTimes == maxSampleTimes) {
+ long averageBathSize = totalBatchSize / sampleTimes;
+ //make a decision
+ long limit = context.getAllocator().getLimit();
Review comment:
A good valuable question! thanks, but still a tough question. It is a
resource management problem. Firstly, I did not know the memory limit using a
default 10 gb value until you mentioned it. Thanks for pointing out this.
To drill's random statistic based memory allocation mechanism, it could not
manage one drillbit's memory allocation accurately. Whatever we do ,we will
risk OOM. Think about what other system do, they will account for the total
memory used by one node, if the node has not enough memory to satisfy the
query, the resource manager will not assign a query to that node. After a node
has accepted a query, they normally specify the network memory space and the
execution memory space size separately. So the system would be memory resource
safety.
To solve the memory risk not only to this PR but also to current
implementation, maybe we need a centralized resource manager not current peer
node sketchy resource management model.
We could specify a configure item to set the max network memory usage to an
appropriate value (according to our cluster hardware memory size )to a fragment
node. But we could not control the parallel receiver nodes happened at one
drillbit.
To your last question, the sender's current implementation will hold a
calculated number rows to flush out a batch. So the row number of batch would
not be a problem. But to some situation, like the row has a text column, it's
possible that the text size will not be uniform at different batches, it's
possible to OOM.
Maybe we could check for OOM .If that happened, we could notify the sender
to change to a half credit value ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Dynamic credit based flow control
> ---------------------------------
>
> Key: DRILL-7607
> URL: https://issues.apache.org/jira/browse/DRILL-7607
> Project: Apache Drill
> Issue Type: New Feature
> Components: Server, Execution - RPC
> Affects Versions: 1.17.0
> Reporter: Weijie Tong
> Assignee: Weijie Tong
> Priority: Major
> Fix For: 1.18.0
>
>
> Drill current has a static credit based flow control between the batch sender
> and receiver. That means ,all the sender send out their batch through the
> DataTunnel by a static 3 semaphore. To the receiver side , there's two cases,
> the UnlimitedRawBatchBuffer has a 6 * fragmentCount receiver semaphore, the
> SpoolingRawBatchBuffer acts as having unlimited receiving semaphore as it
> could flush data to disk.
> The static credit has the following weak points:
> 1. While the send batch data size is low(e.g. it has only one column bigint
> data) and the receiver has larger memory space, the sender still could not
> send out its data rapidly.
> 2. As the static credit assumption does not set the semaphore number
> according to the corresponding receiver memory space, it still have the risk
> to make the receiver OOM.
> 3. As the sender semaphore is small, it could not send its batch
> consecutively due to wait for an Ack to release one semaphore , and then ,
> the sender's corresponding execution pipeline would be halt, also the same to
> its leaf execution nodes.
> The dynamic credit based flow control could solve these problems. It starts
> from the static credit flow control. Then the receiver collects some batch
> datas to calculate the average batch size. According to the receiver side
> memory space, the receiver make a runtime sender credit and receiver side
> total credit. The receiver sends out the runtime sender credit number to the
> sender by the Ack response. The sender change to the runtime sender credit
> number when receives the Ack response with a runtime credit value.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)