Repository: geode Updated Branches: refs/heads/feature/GEODE-2745 [created] ba3b28adc
GEODE-2745: WaitUntilBucketRegionQueueFlushedCallable gets BucketRegionQueue.latestQueuedKey in constructor vs. setting when callable invoked. - Added getter in BucketRegionQueue for latestQueuedKey - WaitUntilBucketRegionQueueFlushedCallable constructor now gets/maintains the BucketRegionQueue.latestQueuedKey Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ba3b28ad Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ba3b28ad Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ba3b28ad Branch: refs/heads/feature/GEODE-2745 Commit: ba3b28adc48884bb5d697d307a28f4831f5d9301 Parents: 799548e Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> Authored: Fri Apr 7 11:57:16 2017 -0700 Committer: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> Committed: Fri Apr 7 11:57:16 2017 -0700 ---------------------------------------------------------------------- .../apache/geode/internal/cache/BucketRegionQueue.java | 13 +++++++++---- ...itUntilParallelGatewaySenderFlushedCoordinator.java | 5 ++++- 2 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/ba3b28ad/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 3259752..2ee74ba 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -464,16 +464,21 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { this.latestAcknowledgedKey.set(key); } - public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + public long getLatestQueuedKey() { + return this.latestQueuedKey.get(); + } + + public boolean waitUntilFlushed(long latestQueuedKey, long timeout, TimeUnit unit) + throws InterruptedException { long then = System.currentTimeMillis(); if (logger.isDebugEnabled()) { - logger.debug("BucketRegionQueue: waitUntilFlushed bucket=" + getId() + "; timeout=" + timeout - + "; unit=" + unit); + logger.debug("BucketRegionQueue: waitUntilFlushed bucket=" + getId() + "; latestQueuedKey=" + + latestQueuedKey + "; timeout=" + timeout + "; unit=" + unit); } boolean result = false; // Wait until latestAcknowledgedKey > latestQueuedKey or the queue is empty if (this.initialized) { - long latestQueuedKeyToCheck = this.latestQueuedKey.get(); + long latestQueuedKeyToCheck = latestQueuedKey; long nanosRemaining = unit.toNanos(timeout); long endTime = System.nanoTime() + nanosRemaining; while (nanosRemaining > 0) { http://git-wip-us.apache.org/repos/asf/geode/blob/ba3b28ad/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index c5945a6..1388dd0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -100,6 +100,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator private BucketRegionQueue brq; + private long latestQueuedKey; + private long timeout; private TimeUnit unit; @@ -107,13 +109,14 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator public WaitUntilBucketRegionQueueFlushedCallable(BucketRegionQueue brq, long timeout, TimeUnit unit) { this.brq = brq; + this.latestQueuedKey = brq.getLatestQueuedKey(); this.timeout = timeout; this.unit = unit; } @Override public Boolean call() throws Exception { - return this.brq.waitUntilFlushed(this.timeout, this.unit); + return this.brq.waitUntilFlushed(this.latestQueuedKey, this.timeout, this.unit); } @Override