commit ef5f9a09822eb9a0cd207c8ef183587654beacd8
Merge: 4ad543d 1252cae
Author: Jason Huynh <[email protected]>
Date: Fri May 12 16:32:23 2017 -0700
Merge branch 'develop' into feature/GEODE-2900
commit 4ad543d600da57185cee5c6714df876d83f751c7
Author: Jason Huynh <[email protected]>
Date: Fri May 12 16:24:05 2017 -0700
Fix for NPE with stateflush
commit b95ed3c9d1226b5acf3afc5b3261da29230151eb
Author: Jason Huynh <[email protected]>
Date: Fri May 12 15:18:35 2017 -0700
Fixing failing test
commit da95ce199f1ec87e7466c14910411e89a3981e36
Author: Jason Huynh <[email protected]>
Date: Fri May 12 10:12:46 2017 -0700
StateFlush changes
commit 5b37efbd75631b99a24b533b62e5f31d7a2a7b4f
Author: Jason Huynh <[email protected]>
Date: Fri May 12 10:12:35 2017 -0700
NPE check
commit bbf016376b76c501f3051521ac46b995f6795781
Author: Jason Huynh <[email protected]>
Date: Tue May 9 15:29:12 2017 -0700
GEODE-2900: spotlessApply
commit 5b4e4330678f9eb49df2e647a9dd0a0f015d8047
Author: Jason Huynh <[email protected]>
Date: Tue May 9 14:40:42 2017 -0700
GEODE-2900: Renamed method and changed some code comments
commit 240b469ff217fbfba39381f196ae3e0e832a69a6
Author: Jason Huynh <[email protected]>
Date: Tue May 9 10:11:39 2017 -0700
GEODE-2900: push shadow key back into the front of the eventSeqNumber
"Queue"
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3c55bd7f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3c55bd7f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3c55bd7f
Branch: refs/heads/feature/GEODE-2900
Commit: 3c55bd7fafcec99a1d8f3fa297a01b0343f6bf5f
Parents: 1252cae
Author: Jason Huynh <[email protected]>
Authored: Fri May 12 16:50:02 2017 -0700
Committer: Jason Huynh <[email protected]>
Committed: Fri May 12 16:50:02 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegionQueue.java | 38 ++++++++------
.../internal/cache/StateFlushOperation.java | 55 +++++++++++++-------
.../parallel/ParallelGatewaySenderQueue.java | 6 ++-
.../geode/internal/cache/GIIDeltaDUnitTest.java | 3 +-
4 files changed, 65 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..e9a74e7 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -27,8 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,10 +73,10 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
private final Map indexes;
/**
- * A transient queue to maintain the eventSeqNum of the events that are to
be sent to remote site.
- * It is cleared when the queue is cleared.
+ * A transient deque, but should be treated like as a fifo queue to maintain
the eventSeqNum of
+ * the events that are to be sent to remote site. It is cleared when the
queue is cleared.
*/
- private final BlockingQueue<Object> eventSeqNumQueue = new
LinkedBlockingQueue<Object>();
+ private final BlockingDeque<Object> eventSeqNumDeque = new
LinkedBlockingDeque<Object>();
// private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new
// LinkedBlockingQueue<EventID>();
@@ -139,7 +141,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
}
});
for (EventID eventID : keys) {
- eventSeqNumQueue.add(eventID);
+ eventSeqNumDeque.addLast(eventID);
}
} else {
TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
@@ -150,7 +152,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
// fix for #49679 NoSuchElementException thrown from
BucketRegionQueue.initialize
if (!sortedKeys.isEmpty()) {
for (Long key : sortedKeys) {
- eventSeqNumQueue.add(key);
+ eventSeqNumDeque.addLast(key);
}
lastKeyRecovered = sortedKeys.last();
if (this.getEventSeqNum() != null) {
@@ -162,7 +164,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(
"For bucket {} ,total keys recovered are : {} last key recovered
is : {} and the seqNo is ",
- getId(), eventSeqNumQueue.size(), lastKeyRecovered,
getEventSeqNum());
+ getId(), eventSeqNumDeque.size(), lastKeyRecovered,
getEventSeqNum());
}
}
this.initialized = true;
@@ -211,7 +213,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
@Override
public void beforeAcquiringPrimaryState() {
int batchSize =
this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
- Iterator<Object> itr = eventSeqNumQueue.iterator();
+ Iterator<Object> itr = eventSeqNumDeque.iterator();
markEventsAsDuplicate(batchSize, itr);
}
@@ -224,7 +226,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
}
});
this.indexes.clear();
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
}
@Override
@@ -236,7 +238,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
result.set(BucketRegionQueue.super.clearEntries(rvv));
}
});
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
return result.get();
}
@@ -250,7 +252,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
getInitializationLock().writeLock().lock();
try {
this.indexes.clear();
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
} finally {
getInitializationLock().writeLock().unlock();
}
@@ -377,7 +379,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(" removing the key {} from eventSeqNumQueue",
event.getKey());
}
- this.eventSeqNumQueue.remove(event.getKey());
+ this.eventSeqNumDeque.remove(event.getKey());
}
}
@@ -412,7 +414,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
if (this.getPartitionedRegion().isDestroyed()) {
throw new BucketRegionQueueUnavailableException();
}
- key = this.eventSeqNumQueue.peek();
+ key = this.eventSeqNumDeque.peekFirst();
if (key != null) {
object = optimalGet(key);
if (object == null &&
!this.getPartitionedRegion().isConflationEnabled()) {
@@ -431,7 +433,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
// RegionQueue[1])[0];
// //queue.addToPeekedKeys(key);
// }
- this.eventSeqNumQueue.remove(key);
+ this.eventSeqNumDeque.remove(key);
}
return object; // OFFHEAP: ok since callers are careful to do destroys on
// region queue after finished with peeked object.
@@ -443,7 +445,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl
event) {
if (didPut) {
if (this.initialized) {
- this.eventSeqNumQueue.add(key);
+ this.eventSeqNumDeque.addLast(key);
updateLargestQueuedKey((Long) key);
}
if (logger.isDebugEnabled()) {
@@ -456,6 +458,10 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
}
}
+ public void pushKeyIntoQueue(Object key) {
+ eventSeqNumDeque.addFirst(key);
+ }
+
private void updateLargestQueuedKey(Long key) {
Atomics.setIfGreater(this.latestQueuedKey, key);
}
@@ -510,7 +516,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
* @throws ForceReattemptException
*/
public Object remove() throws ForceReattemptException {
- Object key = this.eventSeqNumQueue.remove();
+ Object key = this.eventSeqNumDeque.removeFirst();
if (key != null) {
destroyKey(key);
}
@@ -586,7 +592,7 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
public boolean isReadyForPeek() {
return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
- && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
+ && !this.eventSeqNumDeque.isEmpty() && getBucketAdvisor().isPrimary();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index eb93b76..34a6742 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -357,6 +357,16 @@ public class StateFlushOperation {
if (dm.getDistributionManagerId().equals(relayRecipient)) {
// no need to send a relay request to this process - just send the
// ack back to the sender
+ Set<DistributedRegion> regions = getRegions(dm);
+ for (DistributedRegion r : regions) {
+ if (r != null) {
+ if (this.allRegions && r.doesNotDistribute()) {
+ // no need to flush a region that does no distribution
+ continue;
+ }
+ waitForCurrentOperations(r, r.isInitialized());
+ }
+ }
StateStabilizedMessage ga = new StateStabilizedMessage();
ga.sendingMember = relayRecipient;
ga.setRecipient(this.getSender());
@@ -374,12 +384,7 @@ public class StateFlushOperation {
gr.requestingMember = this.getSender();
gr.processorId = processorId;
try {
- Set<DistributedRegion> regions;
- if (this.allRegions) {
- regions = getAllRegions(dm);
- } else {
- regions = Collections.singleton(this.getRegion(dm));
- }
+ Set<DistributedRegion> regions = getRegions(dm);
for (DistributedRegion r : regions) {
if (r == null) {
if (logger.isTraceEnabled(LogMarker.DM)) {
@@ -392,18 +397,7 @@ public class StateFlushOperation {
continue;
}
boolean initialized = r.isInitialized();
- if (initialized) {
- if (this.flushNewOps) {
- r.getDistributionAdvisor().forceNewMembershipVersion(); //
force a new "view" so
- //
we can track current
- //
ops
- }
- try {
- r.getDistributionAdvisor().waitForCurrentOperations();
- } catch (RegionDestroyedException e) {
- // continue with the next region
- }
- }
+ waitForCurrentOperations(r, initialized);
boolean useMulticast =
r.getMulticastEnabled() &&
r.getSystem().getConfig().getMcastPort() != 0;
if (initialized) {
@@ -455,6 +449,31 @@ public class StateFlushOperation {
}
}
+ private void waitForCurrentOperations(final DistributedRegion r, final
boolean initialized) {
+ if (initialized) {
+ if (this.flushNewOps) {
+ r.getDistributionAdvisor().forceNewMembershipVersion(); // force a
new "view" so
+ // we can
track current
+ // ops
+ }
+ try {
+ r.getDistributionAdvisor().waitForCurrentOperations();
+ } catch (RegionDestroyedException e) {
+ // continue with the next region
+ }
+ }
+ }
+
+ private Set<DistributedRegion> getRegions(final DistributionManager dm) {
+ Set<DistributedRegion> regions;
+ if (this.allRegions) {
+ regions = getAllRegions(dm);
+ } else {
+ regions = Collections.singleton(this.getRegion(dm));
+ }
+ return regions;
+ }
+
@Override
public void toData(DataOutput dout) throws IOException {
super.toData(dout);
http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..9b55abb 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import
org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
@@ -81,7 +80,6 @@ import
org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import
org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -1357,6 +1355,10 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
final PartitionedRegion region = (PartitionedRegion) event.getRegion();
if (!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
iterator.remove();
+ BucketRegionQueue brq =
getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+ if (brq != null) {
+ brq.pushKeyIntoQueue(event.getShadowKey());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
index bfa30c3..e80b82d 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
@@ -771,7 +771,8 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
// now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6
waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6),
gc=0
-
+ P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
+
// restart and gii, R's rvv should be the same as P's
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
createDistributedRegion(R);