eolivelli commented on code in PR #3348:
URL: https://github.com/apache/bookkeeper/pull/3348#discussion_r902219872


##########
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java:
##########
@@ -59,78 +59,102 @@ protected ListeningExecutorService delegate() {
 
     @Override
     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.schedule(command, delay, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.schedule(command, delay, unit);
+        }
     }
 
     @Override
     public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, 
long delay, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.schedule(callable, delay, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.schedule(callable, delay, unit);
+        }
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                             long initialDelay, 
long period, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.scheduleAtFixedRate(command, initialDelay, period, 
unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.scheduleAtFixedRate(command, initialDelay, 
period, unit);
+        }
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable 
command,
                                                                long 
initialDelay, long delay, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.scheduleAtFixedRate(command, initialDelay, delay, 
unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.scheduleAtFixedRate(command, initialDelay, 
delay, unit);
+        }
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Callable<T> task) {
-        this.checkQueue(1);
-        return super.submit(task);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task);
+        }
     }
 
     @Override
     public ListenableFuture<?> submit(Runnable task) {
-        this.checkQueue(1);
-        return super.submit(task);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task);
+        }
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
-        this.checkQueue(tasks.size());
-        return super.invokeAll(tasks);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAll(tasks);
+        }
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks,
                                          long timeout, TimeUnit unit) throws 
InterruptedException {
-        this.checkQueue(tasks.size());
-        return super.invokeAll(tasks, timeout, unit);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAll(tasks, timeout, unit);
+        }
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
-        this.checkQueue(tasks.size());
-        return super.invokeAny(tasks);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAny(tasks);
+        }
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout,
                            TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
-        this.checkQueue(tasks.size());
-        return super.invokeAny(tasks, timeout, unit);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAny(tasks, timeout, unit);
+        }
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        this.checkQueue(1);
-        return super.submit(task, result);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task, result);
+        }
     }
 
     @Override
     public void execute(Runnable command) {
-        this.checkQueue(1);
-        super.execute(command);
+        synchronized (this) {

Review Comment:
   a style note..... you can push `synchronized` to the method signature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to