This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4cb75ae484 GEODE-10395 remove locks from List if dlock.acquireTryLocks
return false (#7846)
4cb75ae484 is described below
commit 4cb75ae4848250606db2f4b14300601755586192
Author: Mario Kevo <[email protected]>
AuthorDate: Tue Sep 20 19:04:08 2022 +0200
GEODE-10395 remove locks from List if dlock.acquireTryLocks return false
(#7846)
---
.../internal/cache/locks/TXLockServiceImpl.java | 26 ++++++--
.../internal/StartupMessageJUnitTest.java | 4 +-
.../cache/locks/TXLockServiceImplTest.java | 71 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 6 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
index f7e7aebe0f..44b9c9f440 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -53,7 +54,7 @@ public class TXLockServiceImpl extends TXLockService {
/**
* List of active txLockIds
*/
- protected List txLockIdList = new ArrayList();
+ protected final List<TXLockId> txLockIdList = new ArrayList<>();
/**
* True if grantor recovery is in progress; used to keep
<code>release</code> from waiting for
@@ -70,6 +71,14 @@ public class TXLockServiceImpl extends TXLockService {
/** The distributed system for cancellation checks. */
private final InternalDistributedSystem system;
+ @VisibleForTesting
+ TXLockServiceImpl(InternalDistributedSystem sys,
StoppableReentrantReadWriteLock recoveryLock,
+ DLockService dlock) {
+ system = sys;
+ this.recoveryLock = recoveryLock;
+ this.dlock = dlock;
+ }
+
TXLockServiceImpl(String name, InternalDistributedSystem sys) {
if (sys == null) {
throw new IllegalStateException(
@@ -129,10 +138,16 @@ public class TXLockServiceImpl extends TXLockService {
if (gotLocks) { // ...otherwise race can occur between tryLocks and
readLock
acquireRecoveryReadLock();
} else if (keyIfFail[0] != null) {
+ synchronized (txLockIdList) {
+ txLockIdList.remove(txLockId);
+ }
throw new CommitConflictException(
String.format("Concurrent transaction commit detected %s",
keyIfFail[0]));
} else {
+ synchronized (txLockIdList) {
+ txLockIdList.remove(txLockId);
+ }
throw new CommitConflictException(
String.format("Failed to request try locks from grantor: %s",
dlock.getLockGrantorId()));
@@ -225,9 +240,7 @@ public class TXLockServiceImpl extends TXLockService {
txLockId));
}
- dlock.releaseTryLocks(txLockId, () -> {
- return recovering;
- });
+ dlock.releaseTryLocks(txLockId, () -> recovering);
txLockIdList.remove(txLockId);
releaseRecoveryReadLock();
@@ -277,4 +290,9 @@ public class TXLockServiceImpl extends TXLockService {
dlock.destroyAndRemove();
}
+ @VisibleForTesting
+ public int getTxLockIdList() {
+ return this.txLockIdList.size();
+ }
+
}
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java
index d017b96ad9..b51453a3a4 100644
---
a/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java
@@ -95,7 +95,7 @@ public class StartupMessageJUnitTest {
startupMessage.process(distributionManager);
assertThat(
- startupMessage.getProcessorType() ==
OperationExecutors.WAITING_POOL_EXECUTOR);
+
startupMessage.getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR);
}
@Test
@@ -111,6 +111,6 @@ public class StartupMessageJUnitTest {
assertThat(
startupResponseMessage
- .getProcessorType() == OperationExecutors.WAITING_POOL_EXECUTOR);
+
.getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR);
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java
new file mode 100644
index 0000000000..e4f8a7b01b
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.locks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.locks.DLockService;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import
org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
+
+public class TXLockServiceImplTest {
+ private TXLockServiceImpl txLockService;
+ private InternalDistributedSystem internalDistributedSystem;
+ private DLockService dlock;
+ private List distLocks;
+ private Set otherMembers;
+ private DistributionManager distributionManager;
+ private InternalDistributedMember distributedMember;
+ private StoppableReentrantReadWriteLock recoverylock;
+
+ @Before
+ public void setUp() {
+ internalDistributedSystem = mock(InternalDistributedSystem.class);
+ dlock = mock(DLockService.class);
+ distributionManager = mock(DistributionManager.class);
+ distributedMember = mock(InternalDistributedMember.class);
+ recoverylock = mock(StoppableReentrantReadWriteLock.class);
+ }
+
+ @Test
+ public void testTxLockService() {
+ distLocks = new ArrayList();
+ txLockService = new TXLockServiceImpl(internalDistributedSystem,
recoverylock, dlock);
+
+ when(dlock.getDistributionManager()).thenReturn(distributionManager);
+ when(dlock.getDistributionManager().getId()).thenReturn(distributedMember);
+
+ assertThat((txLockService).getTxLockIdList()).isEqualTo(0);
+
+ assertThrows(CommitConflictException.class,
+ () -> txLockService.txLock(distLocks, otherMembers));
+
+ assertThat((txLockService).getTxLockIdList()).isEqualTo(0);
+ }
+}