This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8ac449f GEODE-6379: track departed members to avoid processing
in-flight lock request (#3176)
8ac449f is described below
commit 8ac449fd14c6d61498770f0039c5a4179d45c8ca
Author: pivotal-eshu <[email protected]>
AuthorDate: Tue Feb 26 15:18:11 2019 -0800
GEODE-6379: track departed members to avoid processing in-flight lock
request (#3176)
---
.../distributed/internal/locks/DLockGrantor.java | 50 +++++++++++-
.../internal/locks/DLockGrantorTest.java | 94 ++++++++++++++++++++++
2 files changed, 143 insertions(+), 1 deletion(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
index 1ac754f..6157e59 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
@@ -15,12 +15,15 @@
package org.apache.geode.distributed.internal.locks;
+import static java.util.concurrent.TimeUnit.DAYS;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -32,7 +35,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -146,6 +151,9 @@ public class DLockGrantor {
*/
private final TXReservationMgr resMgr = new TXReservationMgr(false);
+ private final Map<InternalDistributedMember, Long> membersDepartedTime = new
LinkedHashMap();
+ private final long departedMemberKeptInMapMilliSeconds = DAYS.toMillis(1);
+
/**
* Enforces waiting until this grantor is initialized. Used to block all
lock requests until
* INITIALIZED.
@@ -498,7 +506,8 @@ public class DLockGrantor {
}
DLockBatch batch = (DLockBatch) request.getObjectName();
- this.resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+ checkIfHostDeparted(batch.getOwner());
+ resMgr.makeReservation((IdentityArrayList) batch.getReqs());
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch]
granting {}",
batch.getBatchId());
@@ -513,6 +522,17 @@ public class DLockGrantor {
}
}
+ private void checkIfHostDeparted(InternalDistributedMember owner) {
+ // Already held batchLocks; hold membersDepartedTime lock just for clarity
+ synchronized (membersDepartedTime) {
+ // the transaction host/txLock requester has departed.
+ if (membersDepartedTime.containsKey(owner)) {
+ throw new TransactionDataNodeHasDepartedException(
+ "The transaction host " + owner + " is no longer a member of the
cluster.");
+ }
+ }
+ }
+
/**
* Returns transaction optimized lock batches that were created by the
specified owner.
* <p>
@@ -524,6 +544,10 @@ public class DLockGrantor {
public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
// Key: Object batchId, Value: DLockBatch batch
synchronized (this.batchLocks) {
+ // put owner into the map first so that no new threads will handle
in-flight requests
+ // from the departed member to lock keys
+ recordMemberDepartedTime(owner);
+
List batchList = new ArrayList();
for (Iterator iter = this.batchLocks.values().iterator();
iter.hasNext();) {
DLockBatch batch = (DLockBatch) iter.next();
@@ -535,6 +559,30 @@ public class DLockGrantor {
}
}
+ void recordMemberDepartedTime(InternalDistributedMember owner) {
+ // Already held batchLocks; hold membersDepartedTime lock just for clarity
+ synchronized (membersDepartedTime) {
+ long currentTime = getCurrentTime();
+ for (Iterator iterator = membersDepartedTime.values().iterator();
iterator.hasNext();) {
+ if ((long) iterator.next() < currentTime -
departedMemberKeptInMapMilliSeconds) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ membersDepartedTime.put(owner, currentTime);
+ }
+ }
+
+ long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
+ @VisibleForTesting
+ Map getMembersDepartedTimeRecords() {
+ return membersDepartedTime;
+ }
+
/**
* Get the batch for the given batchId (for example use a txLockId from
TXLockBatch in order to
* update its participants). This operation was added as part of the
solution to bug 32999.
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
new file mode 100644
index 0000000..99d3c02
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.locks;
+
+import static java.util.concurrent.TimeUnit.DAYS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class DLockGrantorTest {
+ private DLockService dLockService;
+ private DistributionManager distributionManager;
+ private DLockGrantor grantor;
+
+ @Before
+ public void setup() {
+ dLockService = mock(DLockService.class, RETURNS_DEEP_STUBS);
+ distributionManager = mock(DistributionManager.class);
+
when(dLockService.getDistributionManager()).thenReturn(distributionManager);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(distributionManager.getCancelCriterion()).thenReturn(cancelCriterion);
+ grantor = DLockGrantor.createGrantor(dLockService, 1);
+ }
+
+ @Test
+ public void handleLockBatchThrowsIfRequesterHasDeparted() {
+ DLockLessorDepartureHandler handler =
mock(DLockLessorDepartureHandler.class);
+ InternalDistributedMember requester =
mock(InternalDistributedMember.class);
+ DLockRequestProcessor.DLockRequestMessage requestMessage =
+ mock(DLockRequestProcessor.DLockRequestMessage.class);
+ when(dLockService.getDLockLessorDepartureHandler()).thenReturn(handler);
+ DLockBatch lockBatch = mock(DLockBatch.class);
+ when(requestMessage.getObjectName()).thenReturn(lockBatch);
+ when(lockBatch.getOwner()).thenReturn(requester);
+
+ grantor.makeReady(true);
+ grantor.getLockBatches(requester);
+
+ assertThatThrownBy(() ->
grantor.handleLockBatch(requestMessage)).isInstanceOf(
+ TransactionDataNodeHasDepartedException.class);
+ }
+
+ @Test
+ public void recordMemberDepartedTimeRecords() {
+ InternalDistributedMember owner = mock(InternalDistributedMember.class);
+ grantor.recordMemberDepartedTime(owner);
+
+ assertThat(grantor.getMembersDepartedTimeRecords()).containsKey(owner);
+ }
+
+ @Test
+ public void recordMemberDepartedTimeRemovesExpiredMembers() {
+ DLockGrantor spy = spy(grantor);
+ long currentTime = System.currentTimeMillis();
+ doReturn(currentTime).doReturn(currentTime).doReturn(currentTime + 1 +
DAYS.toMillis(1))
+ .when(spy).getCurrentTime();
+
+ for (int i = 0; i < 2; i++) {
+ spy.recordMemberDepartedTime(mock(InternalDistributedMember.class));
+ }
+ assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(2);
+
+ InternalDistributedMember owner = mock(InternalDistributedMember.class);
+ spy.recordMemberDepartedTime(owner);
+
+ assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(1);
+ assertThat(spy.getMembersDepartedTimeRecords()).containsKey(owner);
+ }
+
+}