This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ed7696b  Bookies should not queue read request indefinitely
ed7696b is described below

commit ed7696b130a0b7b6fb63b4d37c4a873fa1f8f96f
Author: Aaron Gresch <[email protected]>
AuthorDate: Mon Nov 13 11:01:56 2017 -0800

    Bookies should not queue read request indefinitely
    
    Integrating some changes made to Yahoo bookkeeper by Matteo Merli.
    
    The apache bookkeeper thread pool in use by OrderedScheduler is different, 
and does not have access to setting a queue size limit, so I created 
QueueAssessibleExecutorService, and check the queue size instead.  Other than 
that, this code change is pretty similar to Yahoo's.
    
    Original bug description for this change:
    
    Since we are using a thread pool to handle read requests in bookies, we 
have seen that when the Bookie read IO is maxed out, the requests are being 
accumulated in the bookie.
    
    Essentially, the bookie is busy serving read requests that have already 
been timed out on the client side (end to end read latency can reach hours..). 
The queue keeps growing indefinitely, leading to OOM errors and bookie restart.
    
    All unit tests pass for me locally with this change.
    
    Author: Aaron Gresch <[email protected]>
    
    Reviewers: Ivan Kelly <[email protected]>, Jia Zhai <None>, Sijie Guo 
<[email protected]>
    
    This closes #706 from agresch/agresch_cms1254
---
 bookkeeper-common/pom.xml                          |   5 +
 .../util/BoundedScheduledExecutorService.java      | 143 +++++++++++++++++++++
 .../bookkeeper/common/util/OrderedScheduler.java   |  23 +++-
 .../src/main/proto/BookkeeperProtocol.proto        |   1 +
 bookkeeper-server/conf/bk_server.conf              |   8 ++
 .../org/apache/bookkeeper/client/BKException.java  |   8 ++
 .../apache/bookkeeper/client/api/BKException.java  |   5 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  44 +++++++
 .../apache/bookkeeper/proto/BookieProtocol.java    |   5 +
 .../bookkeeper/proto/BookieRequestProcessor.java   |  77 +++++++++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |   5 +
 .../bookkeeper/util/OrderedSafeExecutor.java       |   8 +-
 .../bookkeeper/client/TestMaxSizeWorkersQueue.java | 123 ++++++++++++++++++
 13 files changed, 436 insertions(+), 19 deletions(-)

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index b369630..9051b59 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -47,6 +47,11 @@
       <version>${commons-lang3.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.errorprone</groupId>
+      <artifactId>error_prone_annotations</artifactId>
+      <version>2.1.2</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
new file mode 100644
index 0000000..b7b9151
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implements {@link ListeningScheduledExecutorService} and allows limiting 
the number
+ * of tasks to be scheduled in the thread's queue.
+ *
+ */
+public class BoundedScheduledExecutorService extends 
ForwardingListeningExecutorService
+        implements ListeningScheduledExecutorService {
+    BlockingQueue<Runnable> queue;
+    ListeningScheduledExecutorService thread;
+    int maxTasksInQueue;
+
+    public BoundedScheduledExecutorService(ScheduledThreadPoolExecutor thread, 
int maxTasksInQueue) {
+        this.queue = thread.getQueue();
+        this.thread = MoreExecutors.listeningDecorator(thread);
+        this.maxTasksInQueue = maxTasksInQueue;
+    }
+
+    @Override
+    protected ListeningExecutorService delegate() {
+        return this.thread;
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.schedule(command, delay, unit);
+    }
+
+    @Override
+    public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, 
long delay, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.schedule(callable, delay, unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                            long initialDelay, 
long period, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.scheduleAtFixedRate(command, initialDelay, period, 
unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable 
command,
+                                                               long 
initialDelay, long delay, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.scheduleAtFixedRate(command, initialDelay, delay, 
unit);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        this.checkQueue();
+        return super.submit(task);
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        this.checkQueue();
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+        this.checkQueue();
+        return super.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks,
+                                         long timeout, TimeUnit unit) throws 
InterruptedException {
+        this.checkQueue();
+        return super.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+        this.checkQueue();
+        return super.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout,
+                           TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+        this.checkQueue();
+        return super.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        this.checkQueue();
+        return super.submit(task, result);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        this.checkQueue();
+        super.execute(command);
+    }
+
+    private void checkQueue() {
+        if (this.maxTasksInQueue > 0 && this.queue.size() >= 
this.maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + 
this.maxTasksInQueue + " items");
+        }
+    }
+
+}
+
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 41a7fa0..fb07b1f 100644
--- 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -21,7 +21,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +30,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -52,7 +52,7 @@ import org.apache.commons.lang.StringUtils;
  * method.
  */
 public class OrderedScheduler {
-
+    public static final int NO_TASK_LIMIT = -1;
     protected static final long WARN_TIME_MICRO_SEC_DEFAULT = 
TimeUnit.SECONDS.toMicros(1);
 
     final String name;
@@ -63,6 +63,7 @@ public class OrderedScheduler {
     final OpStatsLogger taskPendingStats;
     final boolean traceTaskExecution;
     final long warnTimeMicroSec;
+    final int maxTasksInQueue;
 
     /**
      * Create a builder to build ordered scheduler.
@@ -88,6 +89,7 @@ public class OrderedScheduler {
         protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
         protected boolean traceTaskExecution = false;
         protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+        protected int maxTasksInQueue = NO_TASK_LIMIT;
 
         public AbstractBuilder<T> name(String name) {
             this.name = name;
@@ -99,6 +101,11 @@ public class OrderedScheduler {
             return this;
         }
 
+        public AbstractBuilder<T> maxTasksInQueue(int num) {
+            this.maxTasksInQueue = num;
+            return this;
+        }
+
         public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
             this.threadFactory = threadFactory;
             return this;
@@ -130,7 +137,8 @@ public class OrderedScheduler {
                 threadFactory,
                 statsLogger,
                 traceTaskExecution,
-                warnTimeMicroSec);
+                warnTimeMicroSec,
+                maxTasksInQueue);
         }
 
     }
@@ -179,21 +187,24 @@ public class OrderedScheduler {
                                ThreadFactory threadFactory,
                                StatsLogger statsLogger,
                                boolean traceTaskExecution,
-                               long warnTimeMicroSec) {
+                               long warnTimeMicroSec,
+                               int maxTasksInQueue) {
         checkArgument(numThreads > 0);
         checkArgument(!StringUtils.isBlank(baseName));
 
+        this.maxTasksInQueue = maxTasksInQueue;
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
         threads = new ListeningScheduledExecutorService[numThreads];
         threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread =  new 
ScheduledThreadPoolExecutor(1,
+            final ScheduledThreadPoolExecutor thread = new 
ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
                         .setNameFormat(name + "-" + getClass().getSimpleName() 
+ "-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
-            threads[i] = MoreExecutors.listeningDecorator(thread);
+            threads[i] = new BoundedScheduledExecutorService(thread, 
this.maxTasksInQueue);
+
             final int idx = i;
             try {
                 threads[idx].submit(new SafeRunnable() {
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto 
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index b43e691..38ed3c5 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -45,6 +45,7 @@ enum StatusCode {
     EBADVERSION = 503;
     EFENCED = 504;
     EREADONLY = 505;
+    ETOOMANYREQUESTS = 506;
 }
 
 /**
diff --git a/bookkeeper-server/conf/bk_server.conf 
b/bookkeeper-server/conf/bk_server.conf
index 11cbff2..b4f37e4 100755
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -147,6 +147,14 @@ journalDirectory=/tmp/bk-txn
 # be handled by netty threads directly.
 # numReadWorkerThreads=1
 
+# If read workers threads are enabled, limit the number of pending requests, to
+# avoid the executor queue to grow indefinitely
+# maxPendingReadRequestsPerThread=10000
+
+# If add workers threads are enabled, limit the number of pending requests, to
+# avoid the executor queue to grow indefinitely
+# maxPendingAddRequestsPerThread=10000
+
 # Whether force compaction is allowed when the disk is full or almost full.
 # Forcing GC may get some space back, but may also fill up disk space more 
quickly.
 # This is because new log files are created before GC, while old garbage
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index ea60a72..5ff6641 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -83,6 +83,8 @@ public abstract class BKException extends 
org.apache.bookkeeper.client.api.BKExc
             return new BKUnclosedFragmentException();
         case Code.WriteOnReadOnlyBookieException:
             return new BKWriteOnReadOnlyBookieException();
+        case Code.TooManyRequestsException:
+            return new BKTooManyRequestsException();
         case Code.ReplicationException:
             return new BKReplicationException();
         case Code.ClientClosedException:
@@ -280,6 +282,12 @@ public abstract class BKException extends 
org.apache.bookkeeper.client.api.BKExc
         }
     }
 
+    public static class BKTooManyRequestsException extends BKException {
+        public BKTooManyRequestsException() {
+            super(Code.TooManyRequestsException);
+        }
+    }
+
     public static class BKReplicationException extends BKException {
         public BKReplicationException() {
             super(Code.ReplicationException);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index 737509d..cf7eda2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -214,7 +214,10 @@ public abstract class BKException extends Exception {
          * Write operations failed due to bookies are readonly.
          */
         int WriteOnReadOnlyBookieException = -104;
-        //-105 reserved for TooManyRequestsException
+        /**
+         * Operations failed due to too many requests in the queue.
+         */
+        int TooManyRequestsException = -105;
         /**
          * Ledger id overflow happens on ledger manager.
          *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 8dcb1b3..ad7002b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -127,6 +127,8 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     // Worker Thread parameters.
     protected final static String NUM_ADD_WORKER_THREADS = 
"numAddWorkerThreads";
     protected final static String NUM_READ_WORKER_THREADS = 
"numReadWorkerThreads";
+    protected final static String MAX_PENDING_READ_REQUESTS_PER_THREAD = 
"maxPendingReadRequestsPerThread";
+    protected final static String MAX_PENDING_ADD_REQUESTS_PER_THREAD = 
"maxPendingAddRequestsPerThread";
     protected final static String NUM_LONG_POLL_WORKER_THREADS = 
"numLongPollWorkerThreads";
 
     // Long poll parameters
@@ -1350,6 +1352,48 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Set the max number of pending read requests for each read worker 
thread. After the quota is reached, new requests
+     * will be failed immediately
+     *
+     * @param maxPendingReadRequestsPerThread
+     * @return server configuration
+     */
+    public ServerConfiguration setMaxPendingReadRequestPerThread(int 
maxPendingReadRequestsPerThread) {
+        setProperty(MAX_PENDING_READ_REQUESTS_PER_THREAD, 
maxPendingReadRequestsPerThread);
+        return this;
+    }
+
+    /**
+     * If read workers threads are enabled, limit the number of pending 
requests, to avoid the executor queue to grow
+     * indefinitely (default: 10000 entries)
+     */
+    public int getMaxPendingReadRequestPerThread() {
+        return getInt(MAX_PENDING_READ_REQUESTS_PER_THREAD, 10000);
+    }
+
+    /**
+     * Set the max number of pending add requests for each add worker thread. 
After the quota is reached, new requests
+     * will be failed immediately
+     *
+     * @param maxPendingAddRequestsPerThread
+     * @return server configuration
+     */
+    public ServerConfiguration setMaxPendingAddRequestPerThread(int 
maxPendingAddRequestsPerThread) {
+        setProperty(MAX_PENDING_ADD_REQUESTS_PER_THREAD, 
maxPendingAddRequestsPerThread);
+        return this;
+    }
+
+    /**
+     * If add workers threads are enabled, limit the number of pending 
requests, to avoid the executor queue to grow
+     * indefinitely (default: 10000 entries)
+     */
+    public int getMaxPendingAddRequestPerThread() {
+        return getInt(MAX_PENDING_ADD_REQUESTS_PER_THREAD, 10000);
+    }
+
+
+
+    /**
      * Get the tick duration in milliseconds.
      * @return
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 2c7a828..879489a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -175,6 +175,11 @@ public interface BookieProtocol {
      */
     public static final int EREADONLY = 105;
 
+    /**
+     * Too many concurrent requests
+     */
+    public static final int ETOOMANYREQUESTS = 106;
+
     public static final short FLAG_NONE = 0x0;
     public static final short FLAG_DO_FENCING = 0x0001;
     public static final short FLAG_RECOVERY_ADD = 0x0002;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 81208f3..7c51143 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -28,10 +28,12 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
@@ -135,12 +137,14 @@ public class BookieRequestProcessor implements 
RequestProcessor {
             StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws 
SecurityException {
         this.serverCfg = serverCfg;
         this.bookie = bookie;
-        this.readThreadPool = 
createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + 
serverCfg.getBookiePort());
-        this.writeThreadPool = 
createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + 
serverCfg.getBookiePort());
+        this.readThreadPool = 
createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + 
serverCfg.getBookiePort(),
+                serverCfg.getMaxPendingReadRequestPerThread());
+        this.writeThreadPool = 
createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + 
serverCfg.getBookiePort(),
+                serverCfg.getMaxPendingAddRequestPerThread());
         this.longPollThreadPool =
             createExecutor(
                 this.serverCfg.getNumLongPollWorkerThreads(),
-                "BookieLongPollThread-" + serverCfg.getBookiePort());
+                "BookieLongPollThread-" + serverCfg.getBookiePort(), 
OrderedScheduler.NO_TASK_LIMIT);
         this.requestTimer = new HashedWheelTimer(
             new 
ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
             this.serverCfg.getRequestTimerTickDurationMs(),
@@ -180,11 +184,11 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         shutdownExecutor(readThreadPool);
     }
 
-    private OrderedSafeExecutor createExecutor(int numThreads, String 
nameFormat) {
+    private OrderedSafeExecutor createExecutor(int numThreads, String 
nameFormat, int maxTasksInQueue) {
         if (numThreads <= 0) {
             return null;
         } else {
-            return 
OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).build();
+            return 
OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).maxTasksInQueue(maxTasksInQueue).build();
         }
     }
 
@@ -288,7 +292,24 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), 
write);
+            try {
+                writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), 
write);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to add entry at 
{}:{}. Too many pending requests",
+                            r.getAddRequest().getLedgerId(), 
r.getAddRequest().getEntryId());
+                }
+                BookkeeperProtocol.AddResponse.Builder addResponse = 
BookkeeperProtocol.AddResponse.newBuilder()
+                        .setLedgerId(r.getAddRequest().getLedgerId())
+                        .setEntryId(r.getAddRequest().getEntryId())
+                        
.setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                BookkeeperProtocol.Response.Builder response = 
BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(write.getHeader())
+                        .setStatus(addResponse.getStatus())
+                        .setAddResponse(addResponse);
+                BookkeeperProtocol.Response resp = response.build();
+                write.sendResponse(addResponse.getStatus(), resp, 
addRequestStats);
+            }
         }
     }
 
@@ -316,7 +337,25 @@ public class BookieRequestProcessor implements 
RequestProcessor {
             if (null == readThreadPool) {
                 read.run();
             } else {
-                readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), 
read);
+                try {
+                    
readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                } catch (RejectedExecutionException e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to process request to read entry at 
{}:{}. Too many pending requests",
+                                r.getReadRequest().getLedgerId(), 
r.getReadRequest().getEntryId());
+                    }
+                    BookkeeperProtocol.ReadResponse.Builder readResponse =
+                            BookkeeperProtocol.ReadResponse.newBuilder()
+                                    
.setLedgerId(r.getAddRequest().getLedgerId())
+                                    .setEntryId(r.getAddRequest().getEntryId())
+                                    
.setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                    BookkeeperProtocol.Response.Builder response = 
BookkeeperProtocol.Response.newBuilder()
+                            .setHeader(read.getHeader())
+                            .setStatus(readResponse.getStatus())
+                            .setReadResponse(readResponse);
+                    BookkeeperProtocol.Response resp = response.build();
+                    read.sendResponse(readResponse.getStatus(), resp, 
readRequestStats);
+                }
             }
         }
     }
@@ -378,7 +417,17 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submitOrdered(r.getLedgerId(), write);
+            try {
+                writeThreadPool.submitOrdered(r.getLedgerId(), write);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to add entry at 
{}:{}. Too many pending requests", r.ledgerId,
+                            r.entryId);
+                }
+
+                write.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
+                        
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), 
addRequestStats);
+            }
         }
     }
 
@@ -387,7 +436,17 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         if (null == readThreadPool) {
             read.run();
         } else {
-            readThreadPool.submitOrdered(r.getLedgerId(), read);
+            try {
+                readThreadPool.submitOrdered(r.getLedgerId(), read);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to read entry at 
{}:{}. Too many pending requests", r.ledgerId,
+                            r.entryId);
+                }
+
+                read.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
+                        
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), 
readRequestStats);
+            }
         }
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 0c8f25a..c392c79 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1040,6 +1040,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 return StatusCode.EFENCED;
             case BookieProtocol.EREADONLY:
                 return StatusCode.EREADONLY;
+            case BookieProtocol.ETOOMANYREQUESTS:
+                return StatusCode.ETOOMANYREQUESTS;
             default:
                 throw new IllegalArgumentException("Invalid error code: " + 
errorCode);
         }
@@ -1771,6 +1773,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             case EREADONLY:
                 rcToRet = BKException.Code.WriteOnReadOnlyBookieException;
                 break;
+            case ETOOMANYREQUESTS:
+                rcToRet = BKException.Code.TooManyRequestsException;
+                break;
             default:
                 break;
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index d832d18..8df8d5b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -58,7 +58,7 @@ public class OrderedSafeExecutor extends 
org.apache.bookkeeper.common.util.Order
                 threadFactory = Executors.defaultThreadFactory();
             }
             return new OrderedSafeExecutor(name, numThreads, threadFactory, 
statsLogger,
-                                           traceTaskExecution, 
warnTimeMicroSec);
+                                           traceTaskExecution, 
warnTimeMicroSec, maxTasksInQueue);
         }
 
     }
@@ -78,11 +78,13 @@ public class OrderedSafeExecutor extends 
org.apache.bookkeeper.common.util.Order
      *            - should we stat task execution
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
+     * @param maxTasksInQueue
+     *            - maximum items allowed in a thread queue. -1 for no limit
      */
     private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory 
threadFactory,
                                 StatsLogger statsLogger, boolean 
traceTaskExecution,
-                                long warnTimeMicroSec) {
-        super(baseName, numThreads, threadFactory, statsLogger, 
traceTaskExecution, warnTimeMicroSec);
+                                long warnTimeMicroSec, int maxTasksInQueue) {
+        super(baseName, numThreads, threadFactory, statsLogger, 
traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
new file mode 100644
index 0000000..49617ec
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+
+public class TestMaxSizeWorkersQueue extends BookKeeperClusterTestCase {
+    DigestType digestType = DigestType.CRC32;
+
+    public TestMaxSizeWorkersQueue() {
+        super(1);
+
+        baseConf.setNumReadWorkerThreads(1);
+        baseConf.setNumAddWorkerThreads(1);
+
+        // Configure very small queue sizes
+        baseConf.setMaxPendingReadRequestPerThread(1);
+        baseConf.setMaxPendingAddRequestPerThread(1);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadRejected() throws Exception {
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+        byte[] content = new byte[100];
+
+        final int N = 1000;
+        // Write few entries
+        for (int i = 0; i < N; i++) {
+            lh.addEntry(content);
+        }
+
+        // Read asynchronously:
+        // - 1st read must always succeed
+        // - Subsequent reads may fail, depending on timing
+        // - At least few, we expect to fail with TooManyRequestException
+        final CountDownLatch counter = new CountDownLatch(2);
+
+        final AtomicInteger rcFirstReadOperation = new AtomicInteger();
+
+        lh.asyncReadEntries(0, 0, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                rcFirstReadOperation.set(rc);
+                counter.countDown();
+            }
+        }, lh);
+
+        final AtomicInteger rcSecondReadOperation = new AtomicInteger();
+
+        lh.asyncReadEntries(0, N - 1, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                rcSecondReadOperation.set(rc);
+                counter.countDown();
+            }
+        }, lh);
+
+        counter.await();
+
+        assertEquals(BKException.Code.OK, rcFirstReadOperation.get());
+        assertEquals(BKException.Code.TooManyRequestsException, 
rcSecondReadOperation.get());
+    }
+
+    @Test(timeout = 60000)
+    public void testAddRejected() throws Exception {
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+        byte[] content = new byte[100];
+
+        final int N = 1000;
+
+        // Write asynchronously, and expect at least few writes to have failed 
with NotEnoughBookies,
+        // because when we get the TooManyRequestException, the client will 
try to form a new ensemble and that
+        // operation will fail since we only have 1 bookie available
+        final CountDownLatch counter = new CountDownLatch(N);
+        final AtomicBoolean receivedTooManyRequestsException = new 
AtomicBoolean();
+
+        // Write few entries
+        for (int i = 0; i < N; i++) {
+            lh.asyncAddEntry(content, new AddCallback() {
+                @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, 
Object ctx) {
+                    if (rc == BKException.Code.NotEnoughBookiesException) {
+                        receivedTooManyRequestsException.set(true);
+                    }
+
+                    counter.countDown();
+                }
+            }, null);
+        }
+
+        counter.await();
+
+        assertTrue(receivedTooManyRequestsException.get());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to