[ 
https://issues.apache.org/jira/browse/DRILL-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048154#comment-17048154
 ] 

ASF GitHub Bot commented on DRILL-7607:
---------------------------------------

paul-rogers commented on pull request #2000: DRILL-7607: support dynamic credit 
based flow control
URL: https://github.com/apache/drill/pull/2000#discussion_r385996548
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java
 ##########
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+/**
+ * Listener that keeps track of the status of batches sent, and updates the 
SendingAccountor when status is received
+ * for each batch
+ */
+public class DataTunnelStatusHandler implements 
RpcOutcomeListener<BitData.AckWithCredit> {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DataTunnelStatusHandler.class);
+  private final SendingAccountor sendingAccountor;
+  private final Consumer<RpcException> consumer;
+
+  public DataTunnelStatusHandler(Consumer<RpcException> consumer, 
SendingAccountor sendingAccountor) {
+    this.consumer = consumer;
+    this.sendingAccountor = sendingAccountor;
+  }
+
+  @Override
+  public void failed(RpcException ex) {
+    sendingAccountor.decrement();
+    consumer.accept(ex);
+  }
+
+  @Override
+  public void success(BitData.AckWithCredit value, ByteBuf buffer) {
+    sendingAccountor.decrement();
+    if (value.getAllowedCredit() != Acks.FAIL_CREDIT) {
+      return;
+    }
+
+    logger.error("Data not accepted downstream. Stopping future sends.");
 
 Review comment:
   Looking at the code further down, it appears that the fail case indicates a 
query failure. The issue then, is not that the receiver did not accept the 
data. Rather, the receiver is telling us the query has died and future sends 
are irrelevant.
   
   Maybe adjust the log message to indicate that the receiver has informed us 
the query has failed.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Dynamic credit based flow control
> ---------------------------------
>
>                 Key: DRILL-7607
>                 URL: https://issues.apache.org/jira/browse/DRILL-7607
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - RPC
>    Affects Versions: 1.17.0
>            Reporter: Weijie Tong
>            Assignee: Weijie Tong
>            Priority: Major
>             Fix For: 1.18.0
>
>
> Drill current has a static credit based flow control between the batch sender 
> and receiver. That means ,all the sender send out their batch through the 
> DataTunnel by a static 3 semaphore. To the receiver side , there's two cases, 
> the UnlimitedRawBatchBuffer has a 6 * fragmentCount receiver semaphore, the 
> SpoolingRawBatchBuffer acts as having unlimited receiving semaphore as it 
> could flush data to disk.
> The static credit has the following weak points:
> 1. While the send batch data size is low(e.g. it has only one column bigint 
> data) and the receiver has larger memory space, the sender still could not 
> send out its data rapidly.
> 2. As the static credit assumption does not set the semaphore number 
> according to the corresponding receiver memory space, it still have the risk 
> to make the receiver OOM.
> 3. As the sender semaphore is small, it could not send its batch 
> consecutively due to wait for an Ack to release one semaphore , and then , 
> the sender's corresponding execution pipeline would be halt, also the same to 
> its leaf execution nodes. 
> The dynamic credit based flow control could solve these problems. It starts 
> from the static credit flow control. Then the receiver collects some batch 
> datas to calculate the average batch size. According to the receiver side 
> memory space, the receiver make a runtime sender credit and receiver side 
> total credit. The receiver sends out the runtime sender credit number to the 
> sender by the Ack response. The sender change to the runtime sender credit 
> number when receives the Ack response with a runtime credit value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to