This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8ac449f  GEODE-6379: track departed members to avoid processing 
in-flight lock request (#3176)
8ac449f is described below

commit 8ac449fd14c6d61498770f0039c5a4179d45c8ca
Author: pivotal-eshu <[email protected]>
AuthorDate: Tue Feb 26 15:18:11 2019 -0800

    GEODE-6379: track departed members to avoid processing in-flight lock 
request (#3176)
---
 .../distributed/internal/locks/DLockGrantor.java   | 50 +++++++++++-
 .../internal/locks/DLockGrantorTest.java           | 94 ++++++++++++++++++++++
 2 files changed, 143 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
index 1ac754f..6157e59 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
@@ -15,12 +15,15 @@
 
 package org.apache.geode.distributed.internal.locks;
 
+import static java.util.concurrent.TimeUnit.DAYS;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +35,9 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.LockServiceDestroyedException;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -146,6 +151,9 @@ public class DLockGrantor {
    */
   private final TXReservationMgr resMgr = new TXReservationMgr(false);
 
+  private final Map<InternalDistributedMember, Long> membersDepartedTime = new 
LinkedHashMap();
+  private final long departedMemberKeptInMapMilliSeconds = DAYS.toMillis(1);
+
   /**
    * Enforces waiting until this grantor is initialized. Used to block all 
lock requests until
    * INITIALIZED.
@@ -498,7 +506,8 @@ public class DLockGrantor {
         }
 
         DLockBatch batch = (DLockBatch) request.getObjectName();
-        this.resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+        checkIfHostDeparted(batch.getOwner());
+        resMgr.makeReservation((IdentityArrayList) batch.getReqs());
         if (isTraceEnabled_DLS) {
           logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] 
granting {}",
               batch.getBatchId());
@@ -513,6 +522,17 @@ public class DLockGrantor {
     }
   }
 
+  private void checkIfHostDeparted(InternalDistributedMember owner) {
+    // Already held batchLocks; hold membersDepartedTime lock just for clarity
+    synchronized (membersDepartedTime) {
+      // the transaction host/txLock requester has departed.
+      if (membersDepartedTime.containsKey(owner)) {
+        throw new TransactionDataNodeHasDepartedException(
+            "The transaction host " + owner + " is no longer a member of the 
cluster.");
+      }
+    }
+  }
+
   /**
    * Returns transaction optimized lock batches that were created by the 
specified owner.
    * <p>
@@ -524,6 +544,10 @@ public class DLockGrantor {
   public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
     // Key: Object batchId, Value: DLockBatch batch
     synchronized (this.batchLocks) {
+      // put owner into the map first so that no new threads will handle 
in-flight requests
+      // from the departed member to lock keys
+      recordMemberDepartedTime(owner);
+
       List batchList = new ArrayList();
       for (Iterator iter = this.batchLocks.values().iterator(); 
iter.hasNext();) {
         DLockBatch batch = (DLockBatch) iter.next();
@@ -535,6 +559,30 @@ public class DLockGrantor {
     }
   }
 
+  void recordMemberDepartedTime(InternalDistributedMember owner) {
+    // Already held batchLocks; hold membersDepartedTime lock just for clarity
+    synchronized (membersDepartedTime) {
+      long currentTime = getCurrentTime();
+      for (Iterator iterator = membersDepartedTime.values().iterator(); 
iterator.hasNext();) {
+        if ((long) iterator.next() < currentTime - 
departedMemberKeptInMapMilliSeconds) {
+          iterator.remove();
+        } else {
+          break;
+        }
+      }
+      membersDepartedTime.put(owner, currentTime);
+    }
+  }
+
+  long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
+
+  @VisibleForTesting
+  Map getMembersDepartedTimeRecords() {
+    return membersDepartedTime;
+  }
+
   /**
    * Get the batch for the given batchId (for example use a txLockId from 
TXLockBatch in order to
    * update its participants). This operation was added as part of the 
solution to bug 32999.
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
new file mode 100644
index 0000000..99d3c02
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.locks;
+
+import static java.util.concurrent.TimeUnit.DAYS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class DLockGrantorTest {
+  private DLockService dLockService;
+  private DistributionManager distributionManager;
+  private DLockGrantor grantor;
+
+  @Before
+  public void setup() {
+    dLockService = mock(DLockService.class, RETURNS_DEEP_STUBS);
+    distributionManager = mock(DistributionManager.class);
+    
when(dLockService.getDistributionManager()).thenReturn(distributionManager);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(distributionManager.getCancelCriterion()).thenReturn(cancelCriterion);
+    grantor = DLockGrantor.createGrantor(dLockService, 1);
+  }
+
+  @Test
+  public void handleLockBatchThrowsIfRequesterHasDeparted() {
+    DLockLessorDepartureHandler handler = 
mock(DLockLessorDepartureHandler.class);
+    InternalDistributedMember requester = 
mock(InternalDistributedMember.class);
+    DLockRequestProcessor.DLockRequestMessage requestMessage =
+        mock(DLockRequestProcessor.DLockRequestMessage.class);
+    when(dLockService.getDLockLessorDepartureHandler()).thenReturn(handler);
+    DLockBatch lockBatch = mock(DLockBatch.class);
+    when(requestMessage.getObjectName()).thenReturn(lockBatch);
+    when(lockBatch.getOwner()).thenReturn(requester);
+
+    grantor.makeReady(true);
+    grantor.getLockBatches(requester);
+
+    assertThatThrownBy(() -> 
grantor.handleLockBatch(requestMessage)).isInstanceOf(
+        TransactionDataNodeHasDepartedException.class);
+  }
+
+  @Test
+  public void recordMemberDepartedTimeRecords() {
+    InternalDistributedMember owner = mock(InternalDistributedMember.class);
+    grantor.recordMemberDepartedTime(owner);
+
+    assertThat(grantor.getMembersDepartedTimeRecords()).containsKey(owner);
+  }
+
+  @Test
+  public void recordMemberDepartedTimeRemovesExpiredMembers() {
+    DLockGrantor spy = spy(grantor);
+    long currentTime = System.currentTimeMillis();
+    doReturn(currentTime).doReturn(currentTime).doReturn(currentTime + 1 + 
DAYS.toMillis(1))
+        .when(spy).getCurrentTime();
+
+    for (int i = 0; i < 2; i++) {
+      spy.recordMemberDepartedTime(mock(InternalDistributedMember.class));
+    }
+    assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(2);
+
+    InternalDistributedMember owner = mock(InternalDistributedMember.class);
+    spy.recordMemberDepartedTime(owner);
+
+    assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(1);
+    assertThat(spy.getMembersDepartedTimeRecords()).containsKey(owner);
+  }
+
+}

Reply via email to