This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82eae5d  Refactor the try-lock code pattern (#10742)
82eae5d is described below

commit 82eae5dc446b3167e37b67db908b909d44d1016d
Author: Yang Yang <[email protected]>
AuthorDate: Sun May 30 18:25:32 2021 +0800

    Refactor the try-lock code pattern (#10742)
    
    It's recommended that calls to Lock#lock should be immediately followed by 
a `try` block with a `finally` clause which releases the lock. I came across a 
few pieces of code that didn't follow this pattern and try to fix it for 
consistency.
    
    References:
    https://errorprone.info/bugpattern/LockNotBeforeTry
    
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java |  3 +--
 .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java |  2 +-
 .../client/impl/PersistentAcknowledgmentsGroupingTracker.java |  8 ++++----
 .../pulsar/functions/instance/JavaInstanceRunnable.java       | 11 +++++------
 .../org/apache/pulsar/functions/windowing/WindowManager.java  |  4 ++--
 .../apache/pulsar/functions/worker/PulsarWorkerService.java   |  2 +-
 .../org/apache/pulsar/functions/worker/SchedulerManager.java  |  8 +++-----
 .../pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java    |  4 ++--
 8 files changed, 19 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 667b390..6bb1a16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -560,8 +560,6 @@ public class PulsarService implements AutoCloseable {
      * Start the pulsar service instance.
      */
     public void start() throws PulsarServerException {
-        mutex.lock();
-
         LOG.info("Starting Pulsar Broker service; version: '{}'",
                 (brokerVersion != null ? brokerVersion : "unknown"));
         LOG.info("Git Revision {}", PulsarVersion.getGitSha());
@@ -570,6 +568,7 @@ public class PulsarService implements AutoCloseable {
                 PulsarVersion.getBuildHost(),
                 PulsarVersion.getBuildTime());
 
+        mutex.lock();
         try {
             if (state != State.Init) {
                 throw new PulsarServerException("Cannot start the service once 
it was stopped");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3b16fa0..5c02904 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -780,8 +780,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             return;
         }
 
+        reentrantLock.lock();
         try {
-            reentrantLock.lock();
             notifyPendingBatchReceivedCallBack(opBatchReceive);
         } finally {
             reentrantLock.unlock();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 09d6bd5..0f34e08 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -141,11 +141,11 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             }
         } else {
             if (isAckReceiptEnabled(consumer.getClientCnx())) {
+                // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                // any ack operation is allowed.
+                this.lock.readLock().lock();
                 try {
-                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
-                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
-                    // any ack operation is allowed.
-                    this.lock.readLock().lock();
                     if (messageIds.size() != 0) {
                         addListAcknowledgment(messageIds);
                         return this.currentIndividualAckFuture;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 92bbd62..e36dd26 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -456,8 +456,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public String getStatsAsString() throws IOException {
         if (isInitialized) {
+            statsLock.readLock().lock();
             try {
-                statsLock.readLock().lock();
                 return stats.getStatsAsString();
             } finally {
                 statsLock.readLock().unlock();
@@ -468,8 +468,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         if (isInitialized) {
+            statsLock.writeLock().lock();
             try {
-                statsLock.writeLock().lock();
                 InstanceCommunication.MetricsData metricsData = 
internalGetMetrics();
                 internalResetMetrics();
                 return metricsData;
@@ -482,8 +482,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public InstanceCommunication.MetricsData getMetrics() {
         if (isInitialized) {
+            statsLock.readLock().lock();
             try {
-                statsLock.readLock().lock();
                 return internalGetMetrics();
             } finally {
                 statsLock.readLock().unlock();
@@ -494,8 +494,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public void resetMetrics() {
         if (isInitialized) {
+            statsLock.writeLock().lock();
             try {
-                statsLock.writeLock().lock();
                 internalResetMetrics();
             } finally {
                 statsLock.writeLock().unlock();
@@ -540,9 +540,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
         if (isInitialized) {
+            statsLock.readLock().lock();
             try {
-                statsLock.readLock().lock();
-
                 functionStatusBuilder.setNumReceived((long) 
stats.getTotalRecordsReceived());
                 functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.getTotalProcessedSuccessfully());
                 functionStatusBuilder.setNumUserExceptions((long) 
stats.getTotalUserExceptions());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index f52aa41..06e0b88 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -121,8 +121,8 @@ public class WindowManager<T> implements TriggerHandler {
         List<Event<T>> windowEvents = null;
         List<Event<T>> expired = null;
 
+        lock.lock();
         try {
-            lock.lock();
     /*
      * scan the entire window to handle out of order events in
      * the case of time based windows.
@@ -196,8 +196,8 @@ public class WindowManager<T> implements TriggerHandler {
         List<Event<T>> eventsToExpire = new ArrayList<>();
         List<Event<T>> eventsToProcess = new ArrayList<>();
 
+        lock.lock();
         try {
-            lock.lock();
             Iterator<Event<T>> it = queue.iterator();
             while (it.hasNext()) {
                 Event<T> windowEvent = it.next();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index d15a594..07fb0de 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -530,8 +530,8 @@ public class PulsarWorkerService implements WorkerService {
                     () -> {
                         // computing a new schedule and checking for failures 
cannot happen concurrently
                         // both paths of code modify internally cached 
assignments map in function runtime manager
+                        schedulerManager.getSchedulerLock().lock();
                         try {
-                            schedulerManager.getSchedulerLock().lock();
                             membershipManager.checkFailures(
                                     functionMetaDataManager, 
functionRuntimeManager, schedulerManager);
                         } finally {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index cc3e371..7562e28 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -176,9 +176,8 @@ public class SchedulerManager implements AutoCloseable {
 
         try {
             return executorService.submit(() -> {
+                schedulerLock.lock();
                 try {
-                    schedulerLock.lock();
-
                     boolean isLeader = leaderService.isLeader();
                     if (isLeader) {
                         try {
@@ -501,10 +500,9 @@ public class SchedulerManager implements AutoCloseable {
     @Override
     public synchronized void close() {
         log.info("Closing scheduler manager");
+        // make sure we are not closing while a scheduling is being calculated
+        schedulerLock.lock();
         try {
-            // make sure we are not closing while a scheduling is being 
calculated
-            schedulerLock.lock();
-
             isRunning = false;
 
             if (scheduledExecutorService != null) {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
index 490534f..60eea36 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
@@ -54,8 +54,8 @@ public class NoStrictCacheSizeAllocator implements 
CacheSizeAllocator {
      * @param size allocate size
      */
     public void allocate(long size) {
+        lock.lock();
         try {
-            lock.lock();
             availableCacheSize.add(-size);
         } finally {
             lock.unlock();
@@ -69,8 +69,8 @@ public class NoStrictCacheSizeAllocator implements 
CacheSizeAllocator {
      * @param size release size
      */
     public void release(long size) {
+        lock.lock();
         try {
-            lock.lock();
             availableCacheSize.add(size);
             if (availableCacheSize.longValue() > maxCacheSize) {
                 availableCacheSize.reset();

Reply via email to