This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 82eae5d Refactor the try-lock code pattern (#10742)
82eae5d is described below
commit 82eae5dc446b3167e37b67db908b909d44d1016d
Author: Yang Yang <[email protected]>
AuthorDate: Sun May 30 18:25:32 2021 +0800
Refactor the try-lock code pattern (#10742)
It's recommended that calls to Lock#lock should be immediately followed by
a `try` block with a `finally` clause which releases the lock. I came across a
few pieces of code that didn't follow this pattern and try to fix it for
consistency.
References:
https://errorprone.info/bugpattern/LockNotBeforeTry
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html
---
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 3 +--
.../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 2 +-
.../client/impl/PersistentAcknowledgmentsGroupingTracker.java | 8 ++++----
.../pulsar/functions/instance/JavaInstanceRunnable.java | 11 +++++------
.../org/apache/pulsar/functions/windowing/WindowManager.java | 4 ++--
.../apache/pulsar/functions/worker/PulsarWorkerService.java | 2 +-
.../org/apache/pulsar/functions/worker/SchedulerManager.java | 8 +++-----
.../pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java | 4 ++--
8 files changed, 19 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 667b390..6bb1a16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -560,8 +560,6 @@ public class PulsarService implements AutoCloseable {
* Start the pulsar service instance.
*/
public void start() throws PulsarServerException {
- mutex.lock();
-
LOG.info("Starting Pulsar Broker service; version: '{}'",
(brokerVersion != null ? brokerVersion : "unknown"));
LOG.info("Git Revision {}", PulsarVersion.getGitSha());
@@ -570,6 +568,7 @@ public class PulsarService implements AutoCloseable {
PulsarVersion.getBuildHost(),
PulsarVersion.getBuildTime());
+ mutex.lock();
try {
if (state != State.Init) {
throw new PulsarServerException("Cannot start the service once
it was stopped");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3b16fa0..5c02904 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -780,8 +780,8 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return;
}
+ reentrantLock.lock();
try {
- reentrantLock.lock();
notifyPendingBatchReceivedCallBack(opBatchReceive);
} finally {
reentrantLock.unlock();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 09d6bd5..0f34e08 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -141,11 +141,11 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
try {
- // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
- // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
- // any ack operation is allowed.
- this.lock.readLock().lock();
if (messageIds.size() != 0) {
addListAcknowledgment(messageIds);
return this.currentIndividualAckFuture;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 92bbd62..e36dd26 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -456,8 +456,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public String getStatsAsString() throws IOException {
if (isInitialized) {
+ statsLock.readLock().lock();
try {
- statsLock.readLock().lock();
return stats.getStatsAsString();
} finally {
statsLock.readLock().unlock();
@@ -468,8 +468,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public InstanceCommunication.MetricsData getAndResetMetrics() {
if (isInitialized) {
+ statsLock.writeLock().lock();
try {
- statsLock.writeLock().lock();
InstanceCommunication.MetricsData metricsData =
internalGetMetrics();
internalResetMetrics();
return metricsData;
@@ -482,8 +482,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public InstanceCommunication.MetricsData getMetrics() {
if (isInitialized) {
+ statsLock.readLock().lock();
try {
- statsLock.readLock().lock();
return internalGetMetrics();
} finally {
statsLock.readLock().unlock();
@@ -494,8 +494,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public void resetMetrics() {
if (isInitialized) {
+ statsLock.writeLock().lock();
try {
- statsLock.writeLock().lock();
internalResetMetrics();
} finally {
statsLock.writeLock().unlock();
@@ -540,9 +540,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder =
InstanceCommunication.FunctionStatus.newBuilder();
if (isInitialized) {
+ statsLock.readLock().lock();
try {
- statsLock.readLock().lock();
-
functionStatusBuilder.setNumReceived((long)
stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long)
stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long)
stats.getTotalUserExceptions());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index f52aa41..06e0b88 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -121,8 +121,8 @@ public class WindowManager<T> implements TriggerHandler {
List<Event<T>> windowEvents = null;
List<Event<T>> expired = null;
+ lock.lock();
try {
- lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
@@ -196,8 +196,8 @@ public class WindowManager<T> implements TriggerHandler {
List<Event<T>> eventsToExpire = new ArrayList<>();
List<Event<T>> eventsToProcess = new ArrayList<>();
+ lock.lock();
try {
- lock.lock();
Iterator<Event<T>> it = queue.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index d15a594..07fb0de 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -530,8 +530,8 @@ public class PulsarWorkerService implements WorkerService {
() -> {
// computing a new schedule and checking for failures
cannot happen concurrently
// both paths of code modify internally cached
assignments map in function runtime manager
+ schedulerManager.getSchedulerLock().lock();
try {
- schedulerManager.getSchedulerLock().lock();
membershipManager.checkFailures(
functionMetaDataManager,
functionRuntimeManager, schedulerManager);
} finally {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index cc3e371..7562e28 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -176,9 +176,8 @@ public class SchedulerManager implements AutoCloseable {
try {
return executorService.submit(() -> {
+ schedulerLock.lock();
try {
- schedulerLock.lock();
-
boolean isLeader = leaderService.isLeader();
if (isLeader) {
try {
@@ -501,10 +500,9 @@ public class SchedulerManager implements AutoCloseable {
@Override
public synchronized void close() {
log.info("Closing scheduler manager");
+ // make sure we are not closing while a scheduling is being calculated
+ schedulerLock.lock();
try {
- // make sure we are not closing while a scheduling is being
calculated
- schedulerLock.lock();
-
isRunning = false;
if (scheduledExecutorService != null) {
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
index 490534f..60eea36 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
@@ -54,8 +54,8 @@ public class NoStrictCacheSizeAllocator implements
CacheSizeAllocator {
* @param size allocate size
*/
public void allocate(long size) {
+ lock.lock();
try {
- lock.lock();
availableCacheSize.add(-size);
} finally {
lock.unlock();
@@ -69,8 +69,8 @@ public class NoStrictCacheSizeAllocator implements
CacheSizeAllocator {
* @param size release size
*/
public void release(long size) {
+ lock.lock();
try {
- lock.lock();
availableCacheSize.add(size);
if (availableCacheSize.longValue() > maxCacheSize) {
availableCacheSize.reset();