This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd6594  Issue #1664 Cancel Scheduled SpeculativeReads
cdd6594 is described below

commit cdd659471e1fb75aaf7738302dbf0dfaf44a9127
Author: JV Jujjuri <vjujj...@salesforce.com>
AuthorDate: Mon Sep 10 12:58:47 2018 -0700

    Issue #1664 Cancel Scheduled SpeculativeReads
    
    If configured every read request schedules a Future task
    to send speculative reads on speculativeReadTimeout.
    When the read is completed successfully, this task must be
    canceled otherwise it leads to memory consumption and under
    heavy load the tasks get accumulated which forces lengthy
    GC cycles. These lengthy GC cycles may cause ZK lease expiry
    and all other sorts of problems eventually resulting in application
    errors.
    
    This fix makes sure that the scheduled Futures are cancelled
    at the end of read task.
    
    Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
    (ref Andrey)
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    (Explain: why you're making that change, what is the problem you're trying 
to solve)
    
    ### Changes
    
    (Describe: what changes you have made)
    
    Master Issue: #<master-issue-number>
    
    Author: JV Jujjuri <vjujj...@salesforce.com>
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Andrey Yegorov <None>, Enrico Olivelli <eolive...@gmail.com>, 
Sijie Guo <si...@apache.org>
    
    This closes #1665 from jvrao/ups_speculative_cancel, closes #1664
---
 .../DefaultSpeculativeRequestExecutionPolicy.java  | 11 +++++++----
 .../apache/bookkeeper/client/PendingReadOp.java    |  6 +++++-
 .../client/ReadLastConfirmedAndEntryOp.java        | 11 ++++++++++-
 .../client/SpeculativeRequestExecutionPolicy.java  |  5 ++++-
 .../bookkeeper/client/TestSpeculativeRead.java     | 22 +++++++++++++++++++++-
 5 files changed, 47 insertions(+), 8 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java
index 7474e56..b2874e5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -64,18 +65,19 @@ public class DefaultSpeculativeRequestExecutionPolicy 
implements SpeculativeRequ
      *
      * @param scheduler The scheduler service to issue the speculative request
      * @param requestExecutor The executor is used to issue the actual 
speculative requests
+     * @return ScheduledFuture, in case caller needs to cancel it.
      */
     @Override
-    public void initiateSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+    public ScheduledFuture<?> initiateSpeculativeRequest(final 
ScheduledExecutorService scheduler,
             final SpeculativeRequestExecutor requestExecutor) {
-        scheduleSpeculativeRead(scheduler, requestExecutor, 
firstSpeculativeRequestTimeout);
+        return scheduleSpeculativeRead(scheduler, requestExecutor, 
firstSpeculativeRequestTimeout);
     }
 
-    private void scheduleSpeculativeRead(final ScheduledExecutorService 
scheduler,
+    private ScheduledFuture<?> scheduleSpeculativeRead(final 
ScheduledExecutorService scheduler,
                                          final SpeculativeRequestExecutor 
requestExecutor,
                                          final int speculativeRequestTimeout) {
         try {
-            scheduler.schedule(new Runnable() {
+            return scheduler.schedule(new Runnable() {
                 @Override
                 public void run() {
                     ListenableFuture<Boolean> issueNextRequest = 
requestExecutor.issueSpeculativeRequest();
@@ -107,5 +109,6 @@ public class DefaultSpeculativeRequestExecutionPolicy 
implements SpeculativeRequ
                         requestExecutor, speculativeRequestTimeout, re);
             }
         }
+        return null;
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index d31a744..4ee9c92 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -482,6 +482,10 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         }
     }
 
+    public ScheduledFuture<?> getSpeculativeTask() {
+        return speculativeTask;
+    }
+
     // I don't think this is ever used in production code -Ivan
     PendingReadOp parallelRead(boolean enabled) {
         this.parallelRead = enabled;
@@ -518,7 +522,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         for (LedgerEntryRequest entry : seq) {
             entry.read();
             if (!parallelRead && 
clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
-                clientCtx.getConf().readSpeculativeRequestPolicy.get()
+                speculativeTask = 
clientCtx.getConf().readSpeculativeRequestPolicy.get()
                     .initiateSpeculativeRequest(clientCtx.getScheduler(), 
entry);
             }
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index e61e666..cb9de32 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
@@ -64,6 +65,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
     private long lastAddConfirmed;
     private long timeOutInMillis;
     private final List<BookieSocketAddress> currentEnsemble;
+    private ScheduledFuture<?> speculativeTask = null;
 
     abstract class ReadLACAndEntryRequest implements AutoCloseable {
 
@@ -461,6 +463,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         return this;
     }
 
+    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
+        if (speculativeTask != null) {
+            speculativeTask.cancel(mayInterruptIfRunning);
+            speculativeTask = null;
+        }
+    }
     /**
      * Speculative Read Logic.
      */
@@ -491,7 +499,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         request.read();
 
         if (!parallelRead && 
clientCtx.getConf().readLACSpeculativeRequestPolicy.isPresent()) {
-            clientCtx.getConf().readLACSpeculativeRequestPolicy.get()
+            speculativeTask = 
clientCtx.getConf().readLACSpeculativeRequestPolicy.get()
                 .initiateSpeculativeRequest(clientCtx.getScheduler(), this);
         }
     }
@@ -521,6 +529,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
     private void submitCallback(int rc) {
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
         LedgerEntry entry;
+        cancelSpeculativeTask(true);
         if (BKException.Code.OK != rc) {
             clientCtx.getClientStats().getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java
index bff4bb3..e04dc98 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 
 /**
  * Define a policy for speculative request execution.
@@ -37,6 +38,8 @@ public interface SpeculativeRequestExecutionPolicy {
      *
      * @param scheduler The scheduler service to issue the speculative request
      * @param requestExectuor The executor is used to issue the actual 
speculative requests
+     * @return ScheduledFuture, in case caller needs to cancel it.
      */
-    void initiateSpeculativeRequest(ScheduledExecutorService scheduler, 
SpeculativeRequestExecutor requestExectuor);
+    ScheduledFuture<?> initiateSpeculativeRequest(ScheduledExecutorService 
scheduler,
+            SpeculativeRequestExecutor requestExectuor);
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 5d251a9..0ae13ba 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -298,6 +298,26 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
     }
 
     /**
+     * Unit test to check if the scheduled speculative task gets cancelled
+     * on successful read.
+     */
+    @Test
+    public void testSpeculativeReadScheduledTaskCancel() throws Exception {
+        long id = getLedgerToRead(3, 2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+        PendingReadOp op = null;
+        try {
+            op = new PendingReadOp(l, bkspec.getClientCtx(), 0, 5, false);
+            op.initiate();
+            op.future().get();
+        } finally {
+            assertNull("Speculative Read tasks must be null", 
op.getSpeculativeTask());
+        }
+    }
+
+    /**
      * Unit test for the speculative read scheduling method.
      */
     @Test
@@ -353,7 +373,7 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
                         }
                         Thread.sleep(1000);
                     }
-                    assertTrue("Request should be done", req0.isComplete());
+                    assertTrue("Request should be done", req.isComplete());
                 }
             }
 

Reply via email to