commit ef5f9a09822eb9a0cd207c8ef183587654beacd8
Merge: 4ad543d 1252cae
Author: Jason Huynh <[email protected]>
Date:   Fri May 12 16:32:23 2017 -0700

    Merge branch 'develop' into feature/GEODE-2900

commit 4ad543d600da57185cee5c6714df876d83f751c7
Author: Jason Huynh <[email protected]>
Date:   Fri May 12 16:24:05 2017 -0700

    Fix for NPE with stateflush

commit b95ed3c9d1226b5acf3afc5b3261da29230151eb
Author: Jason Huynh <[email protected]>
Date:   Fri May 12 15:18:35 2017 -0700

    Fixing failing test

commit da95ce199f1ec87e7466c14910411e89a3981e36
Author: Jason Huynh <[email protected]>
Date:   Fri May 12 10:12:46 2017 -0700

    StateFlush changes

commit 5b37efbd75631b99a24b533b62e5f31d7a2a7b4f
Author: Jason Huynh <[email protected]>
Date:   Fri May 12 10:12:35 2017 -0700

    NPE check

commit bbf016376b76c501f3051521ac46b995f6795781
Author: Jason Huynh <[email protected]>
Date:   Tue May 9 15:29:12 2017 -0700

    GEODE-2900: spotlessApply

commit 5b4e4330678f9eb49df2e647a9dd0a0f015d8047
Author: Jason Huynh <[email protected]>
Date:   Tue May 9 14:40:42 2017 -0700

    GEODE-2900: Renamed method and changed some code comments

commit 240b469ff217fbfba39381f196ae3e0e832a69a6
Author: Jason Huynh <[email protected]>
Date:   Tue May 9 10:11:39 2017 -0700

    GEODE-2900:  push shadow key back into the front of the eventSeqNumber 
"Queue"


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3c55bd7f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3c55bd7f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3c55bd7f

Branch: refs/heads/feature/GEODE-2900
Commit: 3c55bd7fafcec99a1d8f3fa297a01b0343f6bf5f
Parents: 1252cae
Author: Jason Huynh <[email protected]>
Authored: Fri May 12 16:50:02 2017 -0700
Committer: Jason Huynh <[email protected]>
Committed: Fri May 12 16:50:02 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegionQueue.java | 38 ++++++++------
 .../internal/cache/StateFlushOperation.java     | 55 +++++++++++++-------
 .../parallel/ParallelGatewaySenderQueue.java    |  6 ++-
 .../geode/internal/cache/GIIDeltaDUnitTest.java |  3 +-
 4 files changed, 65 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..e9a74e7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -27,8 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,10 +73,10 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
   private final Map indexes;
 
   /**
-   * A transient queue to maintain the eventSeqNum of the events that are to 
be sent to remote site.
-   * It is cleared when the queue is cleared.
+   * A transient deque, but should be treated like as a fifo queue to maintain 
the eventSeqNum of
+   * the events that are to be sent to remote site. It is cleared when the 
queue is cleared.
    */
-  private final BlockingQueue<Object> eventSeqNumQueue = new 
LinkedBlockingQueue<Object>();
+  private final BlockingDeque<Object> eventSeqNumDeque = new 
LinkedBlockingDeque<Object>();
 
   // private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new
   // LinkedBlockingQueue<EventID>();
@@ -139,7 +141,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
             }
           });
           for (EventID eventID : keys) {
-            eventSeqNumQueue.add(eventID);
+            eventSeqNumDeque.addLast(eventID);
           }
         } else {
           TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
@@ -150,7 +152,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
           // fix for #49679 NoSuchElementException thrown from 
BucketRegionQueue.initialize
           if (!sortedKeys.isEmpty()) {
             for (Long key : sortedKeys) {
-              eventSeqNumQueue.add(key);
+              eventSeqNumDeque.addLast(key);
             }
             lastKeyRecovered = sortedKeys.last();
             if (this.getEventSeqNum() != null) {
@@ -162,7 +164,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "For bucket {} ,total keys recovered are : {} last key recovered 
is : {} and the seqNo is ",
-              getId(), eventSeqNumQueue.size(), lastKeyRecovered, 
getEventSeqNum());
+              getId(), eventSeqNumDeque.size(), lastKeyRecovered, 
getEventSeqNum());
         }
       }
       this.initialized = true;
@@ -211,7 +213,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
   @Override
   public void beforeAcquiringPrimaryState() {
     int batchSize = 
this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
-    Iterator<Object> itr = eventSeqNumQueue.iterator();
+    Iterator<Object> itr = eventSeqNumDeque.iterator();
     markEventsAsDuplicate(batchSize, itr);
   }
 
@@ -224,7 +226,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       }
     });
     this.indexes.clear();
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
   }
 
   @Override
@@ -236,7 +238,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
         result.set(BucketRegionQueue.super.clearEntries(rvv));
       }
     });
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
     return result.get();
   }
 
@@ -250,7 +252,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     getInitializationLock().writeLock().lock();
     try {
       this.indexes.clear();
-      this.eventSeqNumQueue.clear();
+      this.eventSeqNumDeque.clear();
     } finally {
       getInitializationLock().writeLock().unlock();
     }
@@ -377,7 +379,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       if (logger.isDebugEnabled()) {
         logger.debug(" removing the key {} from eventSeqNumQueue", 
event.getKey());
       }
-      this.eventSeqNumQueue.remove(event.getKey());
+      this.eventSeqNumDeque.remove(event.getKey());
     }
   }
 
@@ -412,7 +414,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      key = this.eventSeqNumQueue.peek();
+      key = this.eventSeqNumDeque.peekFirst();
       if (key != null) {
         object = optimalGet(key);
         if (object == null && 
!this.getPartitionedRegion().isConflationEnabled()) {
@@ -431,7 +433,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
         // RegionQueue[1])[0];
         // //queue.addToPeekedKeys(key);
         // }
-        this.eventSeqNumQueue.remove(key);
+        this.eventSeqNumDeque.remove(key);
       }
       return object; // OFFHEAP: ok since callers are careful to do destroys on
                      // region queue after finished with peeked object.
@@ -443,7 +445,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
   protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl 
event) {
     if (didPut) {
       if (this.initialized) {
-        this.eventSeqNumQueue.add(key);
+        this.eventSeqNumDeque.addLast(key);
         updateLargestQueuedKey((Long) key);
       }
       if (logger.isDebugEnabled()) {
@@ -456,6 +458,10 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     }
   }
 
+  public void pushKeyIntoQueue(Object key) {
+    eventSeqNumDeque.addFirst(key);
+  }
+
   private void updateLargestQueuedKey(Long key) {
     Atomics.setIfGreater(this.latestQueuedKey, key);
   }
@@ -510,7 +516,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
    * @throws ForceReattemptException
    */
   public Object remove() throws ForceReattemptException {
-    Object key = this.eventSeqNumQueue.remove();
+    Object key = this.eventSeqNumDeque.removeFirst();
     if (key != null) {
       destroyKey(key);
     }
@@ -586,7 +592,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
 
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
-        && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
+        && !this.eventSeqNumDeque.isEmpty() && getBucketAdvisor().isPrimary();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index eb93b76..34a6742 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -357,6 +357,16 @@ public class StateFlushOperation {
       if (dm.getDistributionManagerId().equals(relayRecipient)) {
         // no need to send a relay request to this process - just send the
         // ack back to the sender
+        Set<DistributedRegion> regions = getRegions(dm);
+        for (DistributedRegion r : regions) {
+          if (r != null) {
+            if (this.allRegions && r.doesNotDistribute()) {
+              // no need to flush a region that does no distribution
+              continue;
+            }
+            waitForCurrentOperations(r, r.isInitialized());
+          }
+        }
         StateStabilizedMessage ga = new StateStabilizedMessage();
         ga.sendingMember = relayRecipient;
         ga.setRecipient(this.getSender());
@@ -374,12 +384,7 @@ public class StateFlushOperation {
         gr.requestingMember = this.getSender();
         gr.processorId = processorId;
         try {
-          Set<DistributedRegion> regions;
-          if (this.allRegions) {
-            regions = getAllRegions(dm);
-          } else {
-            regions = Collections.singleton(this.getRegion(dm));
-          }
+          Set<DistributedRegion> regions = getRegions(dm);
           for (DistributedRegion r : regions) {
             if (r == null) {
               if (logger.isTraceEnabled(LogMarker.DM)) {
@@ -392,18 +397,7 @@ public class StateFlushOperation {
                 continue;
               }
               boolean initialized = r.isInitialized();
-              if (initialized) {
-                if (this.flushNewOps) {
-                  r.getDistributionAdvisor().forceNewMembershipVersion(); // 
force a new "view" so
-                                                                          // 
we can track current
-                                                                          // 
ops
-                }
-                try {
-                  r.getDistributionAdvisor().waitForCurrentOperations();
-                } catch (RegionDestroyedException e) {
-                  // continue with the next region
-                }
-              }
+              waitForCurrentOperations(r, initialized);
               boolean useMulticast =
                   r.getMulticastEnabled() && 
r.getSystem().getConfig().getMcastPort() != 0;
               if (initialized) {
@@ -455,6 +449,31 @@ public class StateFlushOperation {
       }
     }
 
+    private void waitForCurrentOperations(final DistributedRegion r, final 
boolean initialized) {
+      if (initialized) {
+        if (this.flushNewOps) {
+          r.getDistributionAdvisor().forceNewMembershipVersion(); // force a 
new "view" so
+                                                                  // we can 
track current
+                                                                  // ops
+        }
+        try {
+          r.getDistributionAdvisor().waitForCurrentOperations();
+        } catch (RegionDestroyedException e) {
+          // continue with the next region
+        }
+      }
+    }
+
+    private Set<DistributedRegion> getRegions(final DistributionManager dm) {
+      Set<DistributedRegion> regions;
+      if (this.allRegions) {
+        regions = getAllRegions(dm);
+      } else {
+        regions = Collections.singleton(this.getRegion(dm));
+      }
+      return regions;
+    }
+
     @Override
     public void toData(DataOutput dout) throws IOException {
       super.toData(dout);

http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..9b55abb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import 
org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -81,7 +80,6 @@ import 
org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import 
org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -1357,6 +1355,10 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
         final PartitionedRegion region = (PartitionedRegion) event.getRegion();
         if (!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
           iterator.remove();
+          BucketRegionQueue brq = 
getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+          if (brq != null) {
+            brq.pushKeyIntoQueue(event.getShadowKey());
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3c55bd7f/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
index bfa30c3..e80b82d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
@@ -771,7 +771,8 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
     // now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6
     waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
     waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), 
gc=0
-
+    P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
+    
     // restart and gii, R's rvv should be the same as P's
     checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
     createDistributedRegion(R);

Reply via email to