This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77cc8ffbab4 KAFKA-17948: Potential issue during tryComplete and
onComplete simultaneous calls to access global variables (#17739)
77cc8ffbab4 is described below
commit 77cc8ffbab40c81e7df62e77fcbd46dbad96aac9
Author: Abhinav Dixit <[email protected]>
AuthorDate: Fri Nov 15 23:56:54 2024 +0530
KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous
calls to access global variables (#17739)
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, Jun Rao <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 88 ++++++++++++----------
.../kafka/server/share/DelayedShareFetchTest.java | 36 +++++++--
.../server/share/SharePartitionManagerTest.java | 8 ++
.../kafka/server/purgatory/DelayedOperation.java | 4 +-
4 files changed, 91 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index a5cd79ee358..9f1ad3ef651 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -37,11 +37,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -58,13 +58,12 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
-
- private Map<TopicIdPartition, FetchRequest.PartitionData>
partitionsAcquired;
- private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
private final SharePartitionManager sharePartitionManager;
// The topic partitions that need to be completed for the share fetch
request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of
insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions;
+ private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
partitionsAcquired;
+ private LinkedHashMap<TopicIdPartition, LogReadResult>
partitionsAlreadyFetched;
DelayedShareFetch(
ShareFetch shareFetch,
@@ -91,31 +90,39 @@ public class DelayedShareFetch extends DelayedOperation {
*/
@Override
public void onComplete() {
+ // We are utilizing lock so that onComplete doesn't do a dirty read
for global variables -
+ // partitionsAcquired and partitionsAlreadyFetched, since these
variables can get updated in a different tryComplete thread.
+ lock.lock();
log.trace("Completing the delayed share fetch request for group {},
member {}, "
+ "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
partitionsAcquired.keySet());
- if (shareFetch.isCompleted())
- return;
+ try {
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData;
+ // tryComplete did not invoke forceComplete, so we need to check
if we have any partitions to fetch.
+ if (partitionsAcquired.isEmpty())
+ topicPartitionData = acquirablePartitions();
+ // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
+ else
+ topicPartitionData = partitionsAcquired;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (partitionsAcquired.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = partitionsAcquired;
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we
complete the request with an empty response.
+ shareFetch.maybeComplete(Collections.emptyMap());
+ return;
+ }
+ log.trace("Fetchable share partitions data: {} with groupId: {}
fetch params: {}",
+ topicPartitionData, shareFetch.groupId(),
shareFetch.fetchParams());
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetch.maybeComplete(Collections.emptyMap());
- return;
+ completeShareFetchRequest(topicPartitionData);
+ } finally {
+ lock.unlock();
}
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetch.groupId(),
shareFetch.fetchParams());
+ }
+ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
try {
- Map<TopicIdPartition, LogReadResult> responseData;
+ LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
else
@@ -123,7 +130,7 @@ public class DelayedShareFetch extends DelayedOperation {
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData,
partitionsAlreadyFetched);
- Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData =
new LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, FetchPartitionData>
fetchPartitionsData = new LinkedHashMap<>();
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
responseData.entrySet())
fetchPartitionsData.put(entry.getKey(),
entry.getValue().toFetchPartitionData(false));
@@ -150,14 +157,14 @@ public class DelayedShareFetch extends DelayedOperation {
*/
@Override
public boolean tryComplete() {
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = acquirablePartitions();
try {
if (!topicPartitionData.isEmpty()) {
// In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
// those topic partitions.
- Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
@@ -194,9 +201,9 @@ public class DelayedShareFetch extends DelayedOperation {
* Prepare fetch request structure for partitions in the share fetch
request for which we can acquire records.
*/
// Visible for testing
- Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be
attempted.
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
new LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes =
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
@@ -231,8 +238,8 @@ public class DelayedShareFetch extends DelayedOperation {
return topicPartitionData;
}
- private Map<TopicIdPartition, LogReadResult>
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
- Map<TopicIdPartition, FetchRequest.PartitionData>
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
+ private LinkedHashMap<TopicIdPartition, LogReadResult>
maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
@@ -240,14 +247,14 @@ public class DelayedShareFetch extends DelayedOperation {
}
});
if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
- return Collections.emptyMap();
+ return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
return readFromLog(partitionsMissingFetchOffsetMetadata);
}
private void maybeUpdateFetchOffsetMetadata(
- Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
@@ -262,7 +269,7 @@ public class DelayedShareFetch extends DelayedOperation {
}
// minByes estimation currently assumes the common case where all fetched
data is acquirable.
- private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
@@ -324,11 +331,11 @@ public class DelayedShareFetch extends DelayedOperation {
}
- private Map<TopicIdPartition, LogReadResult>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ private LinkedHashMap<TopicIdPartition, LogReadResult>
readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch =
shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
if (partitionsToFetch.isEmpty()) {
- return Collections.emptyMap();
+ return new LinkedHashMap<>();
}
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
@@ -340,7 +347,7 @@ public class DelayedShareFetch extends DelayedOperation {
QuotaFactory.UNBOUNDED_QUOTA,
true);
- Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+ LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new
LinkedHashMap<>();
responseLogResult.foreach(tpLogResult -> {
responseData.put(tpLogResult._1(), tpLogResult._2());
return BoxedUnit.UNIT;
@@ -350,7 +357,7 @@ public class DelayedShareFetch extends DelayedOperation {
return responseData;
}
- private boolean anyPartitionHasLogReadError(Map<TopicIdPartition,
LogReadResult> replicaManagerReadResponse) {
+ private boolean
anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse) {
return replicaManagerReadResponse.values().stream()
.anyMatch(logReadResult -> logReadResult.error().code() !=
Errors.NONE.code());
}
@@ -379,9 +386,9 @@ public class DelayedShareFetch extends DelayedOperation {
}
// Visible for testing.
- Map<TopicIdPartition, LogReadResult>
combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
-
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
- Map<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData,
+
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition,
partitionData);
@@ -390,7 +397,7 @@ public class DelayedShareFetch extends DelayedOperation {
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
- Map<TopicIdPartition, LogReadResult>
missingTopicPartitionsLogReadResponse =
readFromLog(missingLogReadTopicPartitions);
+ LinkedHashMap<TopicIdPartition, LogReadResult>
missingTopicPartitionsLogReadResponse =
readFromLog(missingLogReadTopicPartitions);
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
@@ -402,4 +409,9 @@ public class DelayedShareFetch extends DelayedOperation {
sharePartition.releaseFetchLock();
});
}
+
+ // Visible for testing.
+ Lock lock() {
+ return lock;
+ }
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index f1f708f9dd9..12b89cffd37 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -127,6 +127,8 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(0)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -182,6 +184,8 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -233,6 +237,8 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -278,6 +284,8 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.tryComplete());
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -321,6 +329,8 @@ public class DelayedShareFetchTest {
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(0)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -368,6 +378,8 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
assertTrue(shareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -404,6 +416,8 @@ public class DelayedShareFetchTest {
// Verifying that the first forceComplete calls acquirablePartitions
method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
assertEquals(0, future.join().size());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
// Force completing the share fetch request for the second time should
hit the future completion check and not
// proceed ahead in the function.
@@ -412,6 +426,8 @@ public class DelayedShareFetchTest {
// Verifying that the second forceComplete does not call
acquirablePartitions method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
Mockito.verify(delayedShareFetch,
times(0)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -462,6 +478,8 @@ public class DelayedShareFetchTest {
assertEquals(2, delayedShareFetchPurgatory.watched());
assertFalse(shareFetch1.isCompleted());
+ assertTrue(delayedShareFetch1.lock().tryLock());
+ delayedShareFetch1.lock().unlock();
Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
@@ -499,6 +517,8 @@ public class DelayedShareFetchTest {
Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
Mockito.verify(replicaManager, times(0)).tryCompleteActions();
Mockito.verify(delayedShareFetch2,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch2.lock().tryLock());
+ delayedShareFetch2.lock().unlock();
}
@Test
@@ -530,21 +550,21 @@ public class DelayedShareFetchTest {
.withSharePartitions(sharePartitions)
.build();
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
new HashMap<>();
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = new LinkedHashMap<>();
topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class));
topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class));
// Case 1 - logReadResponse contains tp0.
- Map<TopicIdPartition, LogReadResult> logReadResponse =
Collections.singletonMap(
- tp0, mock(LogReadResult.class));
+ LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new
LinkedHashMap<>();
+ logReadResponse.put(tp0, mock(LogReadResult.class));
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
- Map<TopicIdPartition, LogReadResult> combinedLogReadResponse =
delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
+ LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse
= delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
assertEquals(topicPartitionData.keySet(),
combinedLogReadResponse.keySet());
assertEquals(combinedLogReadResponse.get(tp0),
logReadResponse.get(tp0));
// Case 2 - logReadResponse contains tp0 and tp1.
- logReadResponse = new HashMap<>();
+ logReadResponse = new LinkedHashMap<>();
logReadResponse.put(tp0, mock(LogReadResult.class));
logReadResponse.put(tp1, mock(LogReadResult.class));
combinedLogReadResponse =
delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
@@ -613,6 +633,8 @@ public class DelayedShareFetchTest {
Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -645,6 +667,8 @@ public class DelayedShareFetchTest {
assertFalse(spy.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -669,6 +693,8 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index fbbccadff8b..06bb123f550 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1727,6 +1727,8 @@ public class SharePartitionManagerTest {
Mockito.verify(sp1, times(1)).nextFetchOffset();
Mockito.verify(sp2, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -1825,6 +1827,8 @@ public class SharePartitionManagerTest {
Mockito.verify(sp1, times(0)).nextFetchOffset();
Mockito.verify(sp2, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -1923,6 +1927,8 @@ public class SharePartitionManagerTest {
Mockito.verify(sp1, times(1)).nextFetchOffset();
Mockito.verify(sp2, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
@@ -2025,6 +2031,8 @@ public class SharePartitionManagerTest {
Mockito.verify(sp1, times(0)).nextFetchOffset();
Mockito.verify(sp2, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
}
@Test
diff --git
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
index 0ad638240c8..f3c818cb9c6 100644
---
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
+++
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
@@ -42,8 +42,8 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class DelayedOperation extends TimerTask {
private final AtomicBoolean completed = new AtomicBoolean(false);
- // Visible for testing
- final Lock lock;
+
+ protected final Lock lock;
public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
this(delayMs, lockOpt.orElse(new ReentrantLock()));