This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f22d23  fix: Bookkeeper client throttling logic is based upon entryId 
instead of ledgerId
0f22d23 is described below

commit 0f22d238c225ec667c58e1c5029644478d636128
Author: Andrey Yegorov <[email protected]>
AuthorDate: Mon May 3 11:44:55 2021 -0700

    fix: Bookkeeper client throttling logic is based upon entryId instead of 
ledgerId
    
    Descriptions of the changes in this PR:
    
    Fixes: #2660
    
    ### Changes
    
    isWriteSetWritable() to use ledgerId for the client selection
    
    Master Issue: #2660
    
    Reviewers: Enrico Olivelli <[email protected]>, Ivan Kelly 
<[email protected]>
    
    This closes #2664 from dlg99/fix/2660
---
 .../java/org/apache/bookkeeper/client/LedgerHandle.java  | 16 ++++++++--------
 .../org/apache/bookkeeper/client/LedgerHandleAdv.java    |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 806dd8b..48818bd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -880,7 +880,7 @@ public class LedgerHandle implements WriteHandle {
             // unresponsive thus helpful enough.
             DistributionSchedule.WriteSet ws = 
distributionSchedule.getWriteSet(firstEntry);
             try {
-                if (!waitForWritable(ws, firstEntry, ws.size() - 1, 
clientCtx.getConf().waitForWriteSetMs)) {
+                if (!waitForWritable(ws, ws.size() - 1, 
clientCtx.getConf().waitForWriteSetMs)) {
                     op.allowFailFastOnUnwritableChannel();
                 }
             } finally {
@@ -1213,7 +1213,7 @@ public class LedgerHandle implements WriteHandle {
     }
 
     private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
-                                       long key, int allowedNonWritableCount) {
+                                       int allowedNonWritableCount) {
         if (allowedNonWritableCount < 0) {
             allowedNonWritableCount = 0;
         }
@@ -1224,7 +1224,7 @@ public class LedgerHandle implements WriteHandle {
         int nonWritableCount = 0;
         List<BookieId> currentEnsemble = getCurrentEnsemble();
         for (int i = 0; i < sz; i++) {
-            if 
(!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
+            if 
(!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1239,21 +1239,21 @@ public class LedgerHandle implements WriteHandle {
         return true;
     }
 
-    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, 
long key,
+    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
                                     int allowedNonWritableCount, long 
durationMs) {
         if (durationMs < 0) {
             return true;
         }
 
         final long startTime = MathUtils.nowInNano();
-        boolean success = isWriteSetWritable(writeSet, key, 
allowedNonWritableCount);
+        boolean success = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
 
         if (!success && durationMs > 0) {
             int backoff = 1;
             final int maxBackoff = 4;
             final long deadline = startTime + 
TimeUnit.MILLISECONDS.toNanos(durationMs);
 
-            while (!isWriteSetWritable(writeSet, key, 
allowedNonWritableCount)) {
+            while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
                 if (MathUtils.nowInNano() < deadline) {
                     long maxSleep = MathUtils.elapsedMSec(startTime);
                     if (maxSleep < 0) {
@@ -1265,7 +1265,7 @@ public class LedgerHandle implements WriteHandle {
                         TimeUnit.MILLISECONDS.sleep(sleepMs);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
-                        success = isWriteSetWritable(writeSet, key, 
allowedNonWritableCount);
+                        success = isWriteSetWritable(writeSet, 
allowedNonWritableCount);
                         break;
                     }
                     if (backoff <= maxBackoff) {
@@ -1340,7 +1340,7 @@ public class LedgerHandle implements WriteHandle {
 
         DistributionSchedule.WriteSet ws = 
distributionSchedule.getWriteSet(op.getEntryId());
         try {
-            if (!waitForWritable(ws, op.getEntryId(), 0, 
clientCtx.getConf().waitForWriteSetMs)) {
+            if (!waitForWritable(ws, 0, 
clientCtx.getConf().waitForWriteSetMs)) {
                 op.allowFailFastOnUnwritableChannel();
             }
         } finally {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 2ea0e0a..1bdc653 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -264,7 +264,7 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
         }
 
         if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
-                    op.getEntryId(), 0, 
clientCtx.getConf().waitForWriteSetMs)) {
+                    0, clientCtx.getConf().waitForWriteSetMs)) {
             op.allowFailFastOnUnwritableChannel();
         }
 

Reply via email to