This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new cdd6594 Issue #1664 Cancel Scheduled SpeculativeReads cdd6594 is described below commit cdd659471e1fb75aaf7738302dbf0dfaf44a9127 Author: JV Jujjuri <vjujj...@salesforce.com> AuthorDate: Mon Sep 10 12:58:47 2018 -0700 Issue #1664 Cancel Scheduled SpeculativeReads If configured every read request schedules a Future task to send speculative reads on speculativeReadTimeout. When the read is completed successfully, this task must be canceled otherwise it leads to memory consumption and under heavy load the tasks get accumulated which forces lengthy GC cycles. These lengthy GC cycles may cause ZK lease expiry and all other sorts of problems eventually resulting in application errors. This fix makes sure that the scheduled Futures are cancelled at the end of read task. Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com> (ref Andrey) Descriptions of the changes in this PR: ### Motivation (Explain: why you're making that change, what is the problem you're trying to solve) ### Changes (Describe: what changes you have made) Master Issue: #<master-issue-number> Author: JV Jujjuri <vjujj...@salesforce.com> Author: Sijie Guo <si...@apache.org> Reviewers: Andrey Yegorov <None>, Enrico Olivelli <eolive...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1665 from jvrao/ups_speculative_cancel, closes #1664 --- .../DefaultSpeculativeRequestExecutionPolicy.java | 11 +++++++---- .../apache/bookkeeper/client/PendingReadOp.java | 6 +++++- .../client/ReadLastConfirmedAndEntryOp.java | 11 ++++++++++- .../client/SpeculativeRequestExecutionPolicy.java | 5 ++++- .../bookkeeper/client/TestSpeculativeRead.java | 22 +++++++++++++++++++++- 5 files changed, 47 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java index 7474e56..b2874e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultSpeculativeRequestExecutionPolicy.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -64,18 +65,19 @@ public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequ * * @param scheduler The scheduler service to issue the speculative request * @param requestExecutor The executor is used to issue the actual speculative requests + * @return ScheduledFuture, in case caller needs to cancel it. */ @Override - public void initiateSpeculativeRequest(final ScheduledExecutorService scheduler, + public ScheduledFuture<?> initiateSpeculativeRequest(final ScheduledExecutorService scheduler, final SpeculativeRequestExecutor requestExecutor) { - scheduleSpeculativeRead(scheduler, requestExecutor, firstSpeculativeRequestTimeout); + return scheduleSpeculativeRead(scheduler, requestExecutor, firstSpeculativeRequestTimeout); } - private void scheduleSpeculativeRead(final ScheduledExecutorService scheduler, + private ScheduledFuture<?> scheduleSpeculativeRead(final ScheduledExecutorService scheduler, final SpeculativeRequestExecutor requestExecutor, final int speculativeRequestTimeout) { try { - scheduler.schedule(new Runnable() { + return scheduler.schedule(new Runnable() { @Override public void run() { ListenableFuture<Boolean> issueNextRequest = requestExecutor.issueSpeculativeRequest(); @@ -107,5 +109,6 @@ public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequ requestExecutor, speculativeRequestTimeout, re); } } + return null; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index d31a744..4ee9c92 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -482,6 +482,10 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { } } + public ScheduledFuture<?> getSpeculativeTask() { + return speculativeTask; + } + // I don't think this is ever used in production code -Ivan PendingReadOp parallelRead(boolean enabled) { this.parallelRead = enabled; @@ -518,7 +522,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { for (LedgerEntryRequest entry : seq) { entry.read(); if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { - clientCtx.getConf().readSpeculativeRequestPolicy.get() + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index e61e666..cb9de32 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf; import java.util.BitSet; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; @@ -64,6 +65,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt private long lastAddConfirmed; private long timeOutInMillis; private final List<BookieSocketAddress> currentEnsemble; + private ScheduledFuture<?> speculativeTask = null; abstract class ReadLACAndEntryRequest implements AutoCloseable { @@ -461,6 +463,12 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt return this; } + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } /** * Speculative Read Logic. */ @@ -491,7 +499,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt request.read(); if (!parallelRead && clientCtx.getConf().readLACSpeculativeRequestPolicy.isPresent()) { - clientCtx.getConf().readLACSpeculativeRequestPolicy.get() + speculativeTask = clientCtx.getConf().readLACSpeculativeRequestPolicy.get() .initiateSpeculativeRequest(clientCtx.getScheduler(), this); } } @@ -521,6 +529,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt private void submitCallback(int rc) { long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano); LedgerEntry entry; + cancelSpeculativeTask(true); if (BKException.Code.OK != rc) { clientCtx.getClientStats().getReadLacAndEntryOpLogger() .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java index bff4bb3..e04dc98 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SpeculativeRequestExecutionPolicy.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; /** * Define a policy for speculative request execution. @@ -37,6 +38,8 @@ public interface SpeculativeRequestExecutionPolicy { * * @param scheduler The scheduler service to issue the speculative request * @param requestExectuor The executor is used to issue the actual speculative requests + * @return ScheduledFuture, in case caller needs to cancel it. */ - void initiateSpeculativeRequest(ScheduledExecutorService scheduler, SpeculativeRequestExecutor requestExectuor); + ScheduledFuture<?> initiateSpeculativeRequest(ScheduledExecutorService scheduler, + SpeculativeRequestExecutor requestExectuor); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java index 5d251a9..0ae13ba 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java @@ -298,6 +298,26 @@ public class TestSpeculativeRead extends BookKeeperClusterTestCase { } /** + * Unit test to check if the scheduled speculative task gets cancelled + * on successful read. + */ + @Test + public void testSpeculativeReadScheduledTaskCancel() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + PendingReadOp op = null; + try { + op = new PendingReadOp(l, bkspec.getClientCtx(), 0, 5, false); + op.initiate(); + op.future().get(); + } finally { + assertNull("Speculative Read tasks must be null", op.getSpeculativeTask()); + } + } + + /** * Unit test for the speculative read scheduling method. */ @Test @@ -353,7 +373,7 @@ public class TestSpeculativeRead extends BookKeeperClusterTestCase { } Thread.sleep(1000); } - assertTrue("Request should be done", req0.isComplete()); + assertTrue("Request should be done", req.isComplete()); } }