GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a Branch: refs/heads/feature/GEODE-2485 Commit: d497d63af422b3b98c480698a9470812539f8a83 Parents: 799548e Author: eshu <e...@pivotal.io> Authored: Fri Apr 7 11:37:35 2017 -0700 Committer: eshu <e...@pivotal.io> Committed: Fri Apr 7 11:37:35 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/DistributedRegion.java | 2 +- .../cache/SearchLoadAndWriteProcessor.java | 61 +++++++---- .../cache/SearchLoadAndWriteProcessorTest.java | 102 ++++++++++++++++++- 3 files changed, 143 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index fa02574..c12a652 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return this.distAdvisor; } - public final CacheDistributionAdvisor getCacheDistributionAdvisor() { + public CacheDistributionAdvisor getCacheDistributionAdvisor() { return this.distAdvisor; } http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java index 3d969f9..2a10792 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java @@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue(); - private InternalDistributedMember selectedNode; + private volatile InternalDistributedMember selectedNode; private boolean selectedNodeDead = false; private int timeout; private boolean netSearchDone = false; @@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private final Object membersLock = new Object(); + private ArrayList<InternalDistributedMember> departedMembers; + private Lock lock = null; // if non-null, then needs to be unlocked in release static final int NETSEARCH = 0; @@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } synchronized (this) { if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) { + if (departedMembers == null) { + departedMembers = new ArrayList<InternalDistributedMember>(); + } + departedMembers.add(id); selectedNode = null; selectedNodeDead = true; computeRemainingTimeout(); @@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { notifyAll(); // signal the waiter; we are not done; but we need the waiter to call // sendNetSearchRequest } - if (responseQueue != null) + if (responseQueue != null) { responseQueue.remove(id); + } checkIfDone(); } } @@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /************** Package Methods **********************/ + InternalDistributedMember getSelectedNode() { + return this.selectedNode; + } + /************** Private Methods **********************/ /** * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region, @@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { synchronized (this.pendingResponders) { this.pendingResponders.clear(); } - this.requestInProgress = true; - this.remoteGetInProgress = true; + synchronized (this) { + this.requestInProgress = true; + this.remoteGetInProgress = true; setSelectedNode(replicate); this.lastNotifySpot = 0; - } - sendValueRequest(replicate); - waitForObject2(this.remainingTimeout); - if (this.authorative) { - if (this.result != null) { - this.netSearch = true; + + sendValueRequest(replicate); + waitForObject2(this.remainingTimeout); + + if (this.authorative) { + if (this.result != null) { + this.netSearch = true; + } + return; + } else { + // clear anything that might have been set by our query. + this.selectedNode = null; + this.selectedNodeDead = false; + this.lastNotifySpot = 0; + this.result = null; } - return; - } else { - // clear anything that might have been set by our query. - this.selectedNode = null; - this.selectedNodeDead = false; - this.lastNotifySpot = 0; - this.result = null; } } synchronized (membersLock) { @@ -1055,8 +1069,15 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { @SuppressWarnings("hiding") protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime, - boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag) { + boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag, + InternalDistributedMember responder) { final boolean isDebugEnabled = logger.isDebugEnabled(); + if (departedMembers != null && departedMembers.contains(responder)) { + if (isDebugEnabled) { + logger.debug("ignore the reply received from a departed member"); + } + return; + } if (this.requestInProgress) { if (requestorTimedOut) { @@ -1163,7 +1184,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException { if (this.requestInProgress) { try { - final DM dm = this.region.cache.getDistributedSystem().getDistributionManager(); + final DM dm = this.region.getCache().getDistributedSystem().getDistributionManager(); long waitTimeMs = timeoutMs; final long endTime = System.currentTimeMillis() + waitTimeMs; for (;;) { @@ -2018,7 +2039,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { this.versionTag.replaceNullIDs(getSender()); } processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized, - this.requestorTimedOut, this.authoritative, this.versionTag); + this.requestorTimedOut, this.authoritative, this.versionTag, getSender()); } public int getDSFID() { http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java index bfe78b0..91ac16b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java @@ -14,15 +14,29 @@ */ package org.apache.geode.internal.cache; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.junit.Test; import org.junit.experimental.categories.Category; - +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.ExpirationAttributes; import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.test.junit.categories.UnitTest; +import org.awaitility.Awaitility; @Category(UnitTest.class) public class SearchLoadAndWriteProcessorTest { @@ -62,4 +76,90 @@ public class SearchLoadAndWriteProcessorTest { } } + InternalDistributedMember departedMember; + + @Test + public void verifyNoProcessingReplyFromADepartedMember() { + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + DistributedRegion lr = mock(DistributedRegion.class); + RegionAttributes attrs = mock(RegionAttributes.class); + GemFireCacheImpl cache = mock(GemFireCacheImpl.class); + InternalDistributedSystem ds = mock(InternalDistributedSystem.class); + DM dm = mock(DM.class); + CacheDistributionAdvisor advisor = mock(CacheDistributionAdvisor.class); + CachePerfStats stats = mock(CachePerfStats.class); + ExpirationAttributes expirationAttrs = mock(ExpirationAttributes.class); + InternalDistributedMember m1 = mock(InternalDistributedMember.class); + InternalDistributedMember m2 = mock(InternalDistributedMember.class); + Set<InternalDistributedMember> replicates = new HashSet<InternalDistributedMember>();; + replicates.add(m1); + replicates.add(m2); + + when(lr.getAttributes()).thenReturn(attrs); + when(lr.getSystem()).thenReturn(ds); + when(lr.getCache()).thenReturn(cache); + when(lr.getCacheDistributionAdvisor()).thenReturn(advisor); + when(lr.getDistributionManager()).thenReturn(dm); + when(lr.getCachePerfStats()).thenReturn(stats); + when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK); + when(lr.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); + when(cache.getDistributedSystem()).thenReturn(ds); + when(cache.getSearchTimeout()).thenReturn(30); + when(attrs.getScope()).thenReturn(Scope.DISTRIBUTED_ACK); + when(attrs.getDataPolicy()).thenReturn(DataPolicy.EMPTY); + when(attrs.getEntryTimeToLive()).thenReturn(expirationAttrs); + when(attrs.getEntryIdleTimeout()).thenReturn(expirationAttrs); + when(advisor.adviseInitializedReplicates()).thenReturn(replicates); + + Object key = "k1"; + byte[] v1 = "v1".getBytes(); + byte[] v2 = "v2".getBytes(); + EntryEventImpl event = EntryEventImpl.create(lr, Operation.GET, key, null, null, false, null); + + + Thread t1 = new Thread(new Runnable() { + public void run() { + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + .until(() -> processor.getSelectedNode() != null); + departedMember = processor.getSelectedNode(); + // Simulate member departed event + processor.memberDeparted(departedMember, true); + } + }); + t1.start(); + + Thread t2 = new Thread(new Runnable() { + public void run() { + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + .until(() -> departedMember != null && processor.getSelectedNode() != null + && departedMember != processor.getSelectedNode()); + + // Handle search result from the departed member + processor.incomingNetSearchReply(v1, System.currentTimeMillis(), false, false, true, + mock(VersionTag.class), departedMember); + } + }); + t2.start(); + + Thread t3 = new Thread(new Runnable() { + public void run() { + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + .until(() -> departedMember != null && processor.getSelectedNode() != null + && departedMember != processor.getSelectedNode()); + // Handle search result from a new member + processor.incomingNetSearchReply(v2, System.currentTimeMillis(), false, false, true, + mock(VersionTag.class), processor.getSelectedNode()); + } + }); + t3.start(); + + processor.initialize(lr, key, null); + processor.doSearchAndLoad(event, null, null); + + assertTrue(Arrays.equals((byte[]) event.getNewValue(), v2)); + } + }