http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 82e6f68..f09bb47 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.AbstractBucketRegionQueue; import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.BucketRegion; @@ -67,7 +66,7 @@ import org.apache.geode.internal.cache.DiskRegionStats; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.ForceReattemptException; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; @@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; @@ -119,8 +117,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { protected volatile boolean resetLastPeeked = false; - - /** * There will be one shadow pr for each of the the PartitionedRegion which has added the * GatewaySender Fix for Bug#45917 We maintain a tempQueue to queue events when buckets are not @@ -134,8 +130,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { * the secondary nodes to remove the events which have already been dispatched from the queue. */ public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 10; + // TODO:REF: how to change the message sync interval ? should it be common for serial and parallel protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL; + // TODO:REF: name change for thread, as it appears in the log private BatchRemovalThread removalThread = null; @@ -223,16 +221,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } final protected int index; + final protected int nDispatcher; private MetaRegionFactory metaRegionFactory; - /** - * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote site. - * It is cleared when the queue is cleared. - */ - // private final BlockingQueue<Long> eventSeqNumQueue; - public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region> userRegions, int idx, int nDispatcher) { this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory()); @@ -249,7 +242,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { this.sender = sender; List<Region> listOfRegions = new ArrayList<Region>(userRegions); - // eventSeqNumQueue = new LinkedBlockingQueue<Long>(); Collections.sort(listOfRegions, new Comparator<Region>() { @Override public int compare(Region o1, Region o2) { @@ -273,7 +265,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { throw new GatewaySenderConfigurationException( LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1 .toLocalizedString(new Object[] {this.sender.getId(), userRegion.getFullPath()})); - // addShadowPartitionedRegionForUserRR((DistributedRegion)userRegion); } } @@ -295,7 +286,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // still, it is safer approach to synchronize it synchronized (ParallelGatewaySenderQueue.class) { if (removalThread == null) { - removalThread = new BatchRemovalThread((GemFireCacheImpl) this.sender.getCache(), this); + removalThread = new BatchRemovalThread(this.sender.getCache(), this); removalThread.start(); } } @@ -317,7 +308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (this.userRegionNameToshadowPRMap.containsKey(regionName)) return; - GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache(); + InternalCache cache = sender.getCache(); final String prQName = getQueueName(sender.getId(), userRegion.getFullPath()); prQ = (PartitionedRegion) cache.getRegion(prQName); if (prQ == null) { @@ -375,8 +366,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { prQ.getPartitionAttributes()); } - // Suranjan: TODO This should not be set on the PR but on the - // GatewaySender + // TODO This should not be set on the PR but on the GatewaySender prQ.enableConflation(sender.isBatchConflationEnabled()); // Before going ahead, make sure all the buckets of shadowPR are @@ -391,32 +381,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } // In case of Replicated Region it may not be necessary. - // if (sender.isPersistenceEnabled()) { - // //Kishor: I need to write a test for this code. - // Set<Integer> allBucketsClone = new HashSet<Integer>(); - // // allBucketsClone.addAll(allBuckets);*/ - // for (int i = 0; i < sender.getMaxParallelismForReplicatedRegion(); i++) - // allBucketsClone.add(i); - // - // while (!(allBucketsClone.size() == 0)) { - // Iterator<Integer> itr = allBucketsClone.iterator(); - // while (itr.hasNext()) { - // InternalDistributedMember node = prQ.getNodeForBucketWrite( - // itr.next(), null); - // if (node != null) { - // itr.remove(); - // } - // } - // // after the iteration is over, sleep for sometime before trying - // // again - // try { - // Thread.sleep(WAIT_CYCLE_SHADOW_BUCKET_LOAD); - // } - // catch (InterruptedException e) { - // logger.error(e); - // } - // } - // } } catch (IOException veryUnLikely) { logger.fatal(LocalizedMessage.create( LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, @@ -453,7 +417,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } private static String convertPathToName(String fullPath) { - // return fullPath.replaceAll("/", "_"); return ""; } @@ -490,7 +453,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { .toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()})); } - GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache(); + InternalCache cache = sender.getCache(); boolean isAccessor = (userPR.getLocalMaxMemory() == 0); final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath()); @@ -549,7 +512,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { .setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null)); // at this point we should be able to assert prQ == meta; - // Suranjan: TODO This should not be set on the PR but on the GatewaySender + // TODO This should not be set on the PR but on the GatewaySender prQ.enableConflation(sender.isBatchConflationEnabled()); if (isAccessor) return; // return from here if accessor node @@ -576,7 +539,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } finally { if (prQ != null) { - this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ); } /* @@ -611,7 +573,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } protected void afterRegionAdd(PartitionedRegion userPR) { - + // nothing } /** @@ -666,18 +628,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public boolean put(Object object) throws InterruptedException, CacheException { final boolean isDebugEnabled = logger.isDebugEnabled(); boolean putDone = false; - // Suranjan : Can this region ever be null? Should we work with regionName and not with region + // Can this region ever be null? Should we work with regionName and not with region // instance. // It can't be as put is happeing on the region and its still under process GatewaySenderEventImpl value = (GatewaySenderEventImpl) object; boolean isDREvent = isDREvent(value); - // if (isDREvent(value)) { - // putInShadowPRForReplicatedRegion(object); - // value.freeOffHeapValue(); - // return; - // } - Region region = value.getRegion(); String regionPath = null; if (isDREvent) { @@ -795,11 +751,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { brq.getInitializationLock().readLock().unlock(); } } else { - // tempQueue = this.bucketToTempQueueMap.get(bucketId); - // if (tempQueue == null) { - // tempQueue = new LinkedBlockingQueue(); - // this.bucketToTempQueueMap.put(bucketId, tempQueue); - // } tempQueue.add(value); putDone = true; // For debugging purpose. @@ -811,7 +762,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } } - // } } } finally { @@ -873,12 +823,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { try { if (brq != null) { addedValueToQueue = brq.addToQueue(key, value); - // TODO : Kishor : During merge, ParallelWANstats test failed. On + // TODO: During merge, ParallelWANstats test failed. On // comment below code test passed. cheetha does not have below code. // need to find out from hcih revision this code came - // if (brq.getBucketAdvisor().isPrimary()) { - // this.stats.incQueueSize(); - // } } } catch (BucketNotFoundException e) { if (logger.isDebugEnabled()) { @@ -933,18 +880,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return new HashSet(this.userRegionNameToshadowPRMap.values()); } - // TODO: Suranjan Find optimal way to get Random shadow pr as this will be called in each put and - // peek. + // TODO: Find optimal way to get Random shadow pr as this will be called in each put and peek. protected PartitionedRegion getRandomShadowPR() { PartitionedRegion prQ = null; if (this.userRegionNameToshadowPRMap.values().size() > 0) { int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size()); prQ = (PartitionedRegion) this.userRegionNameToshadowPRMap.values().toArray()[randomIndex]; } - // if (this.userPRToshadowPRMap.values().size() > 0 - // && (prQ == null)) { - // prQ = getRandomShadowPR(); - // } return prQ; } @@ -1029,13 +971,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // TODO:REF: instead of shuffle use random number, in this method we are // returning id instead we should return BRQ itself - /* - * Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets) { - * BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore() - * .getBucketRegionQueueByBucketId(bucketId); - * - * if (br != null && br.isReadyForPeek()) { return br.getId(); } } - */ } return -1; } @@ -1052,9 +987,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { GatewaySenderEventImpl event = this.peekedEvents.remove(); try { - // PartitionedRegion prQ = this.userPRToshadowPRMap.get(ColocationHelper - // .getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath()); - // PartitionedRegion prQ = null; int bucketId = -1; Object key = null; @@ -1071,11 +1003,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } else { String regionPath = event.getRegionPath(); - GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache(); + InternalCache cache = this.sender.getCache(); Region region = (PartitionedRegion) cache.getRegion(regionPath); if (region != null && !region.isDestroyed()) { - // TODO: Suranjan We have to get colocated parent region for this - // region + // TODO: We have to get colocated parent region for this region if (region instanceof DistributedRegion) { prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath()); event.getBucketId(); @@ -1105,7 +1036,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) { boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); - // TODO : Kishor : Make sure we dont need to initalize a bucket + // TODO : Make sure we dont need to initalize a bucket // before destroying a key from it try { if (brq != null) { @@ -1261,7 +1192,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - blockProcesorThreadIfRequired(); + blockProcessorThreadIfRequired(); return batch; } @@ -1340,7 +1271,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { this, batch.size(), size(), localSize()); } if (batch.size() == 0) { - blockProcesorThreadIfRequired(); + blockProcessorThreadIfRequired(); } return batch; } @@ -1400,10 +1331,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - protected void blockProcesorThreadIfRequired() throws InterruptedException { + protected void blockProcessorThreadIfRequired() throws InterruptedException { queueEmptyLock.lock(); try { - // while (isQueueEmpty) { if (isQueueEmpty) { // merge44610: this if condition came from cheetah 44610 if (logger.isDebugEnabled()) { logger.debug("Going to wait, till notified."); @@ -1414,7 +1344,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // parameter but cedar does not have such corresponding method queueEmptyCondition.await(1000); // merge44610: this time waiting came from cheetah 44610 - // isQueueEmpty = this.localSize() == 0; } // update the flag so that next time when we come we will block. isQueueEmpty = this.localSizeForProcessor() == 0; @@ -1526,7 +1455,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { throw new UnsupportedOperationException(); } - @Override public void remove(int batchSize) throws CacheException { for (int i = 0; i < batchSize; i++) { @@ -1596,14 +1524,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { @Override public void close() { // Because of bug 49060 do not close the regions of a parallel queue - // for (Region r: getRegions()) { - // if (r != null && !r.isDestroyed()) { - // try { - // r.close(); - // } catch (RegionDestroyedException e) { - // } - // } - // } } /** @@ -1634,14 +1554,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue { */ private volatile boolean shutdown = false; - private final GemFireCacheImpl cache; + private final InternalCache cache; private final ParallelGatewaySenderQueue parallelQueue; /** * Constructor : Creates and initializes the thread */ - public BatchRemovalThread(GemFireCacheImpl c, ParallelGatewaySenderQueue queue) { + public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) { super("BatchRemovalThread"); // TODO:REF: Name for this thread ? this.setDaemon(true); @@ -1772,7 +1692,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - private Set<InternalDistributedMember> getAllRecipients(GemFireCacheImpl cache, Map map) { + private Set<InternalDistributedMember> getAllRecipients(InternalCache cache, Map map) { Set recipients = new ObjectOpenHashSet(); for (Object pr : map.keySet()) { PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String) pr); @@ -1811,7 +1731,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { AbstractGatewaySender sender = null; public ParallelGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, - LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender pgSender) { + LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender pgSender) { super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) .setSnapshotInputStream(null).setImageTarget(null) @@ -1872,8 +1792,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } static class MetaRegionFactory { - ParallelGatewaySenderQueueMetaRegion newMetataRegion(GemFireCacheImpl cache, - final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) { + ParallelGatewaySenderQueueMetaRegion newMetataRegion(InternalCache cache, final String prQName, + final RegionAttributes ra, AbstractGatewaySender sender) { ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender); return meta;
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index a7f224f..d79e2c1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -38,6 +38,7 @@ import org.apache.geode.distributed.internal.PooledDistributionMessage; import org.apache.geode.internal.cache.AbstractBucketRegionQueue; import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionHelper; @@ -52,7 +53,6 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage; * * @since GemFire 8.0 */ - public class ParallelQueueRemovalMessage extends PooledDistributionMessage { private static final Logger logger = LogService.getLogger(); @@ -73,7 +73,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { @Override protected void process(DistributionManager dm) { final boolean isDebugEnabled = logger.isDebugEnabled(); - final GemFireCacheImpl cache; + final InternalCache cache; cache = GemFireCacheImpl.getInstance(); if (cache != null) { int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 02baf81..7928662 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -54,7 +54,7 @@ import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.RegionQueue; @@ -72,7 +72,6 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration; /** * @since GemFire 7.0 - * */ public class SerialGatewaySenderQueue implements RegionQueue { @@ -208,14 +207,12 @@ public class SerialGatewaySenderQueue implements RegionQueue { initializeRegion(abstractSender, listener); // Increment queue size. Fix for bug 51988. this.stats.incQueueSize(this.region.size()); - this.removalThread = new BatchRemovalThread((GemFireCacheImpl) abstractSender.getCache()); + this.removalThread = new BatchRemovalThread(abstractSender.getCache()); this.removalThread.start(); this.sender = abstractSender; if (logger.isDebugEnabled()) { logger.debug("{}: Contains {} elements", this, size()); } - - } public Region<Long, AsyncEvent> getRegion() { @@ -233,18 +230,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME)); final boolean isWbcl = this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX); if (!(isPDXRegion && isWbcl)) { - // TODO: Kishor : after merging this change. AsyncEventQueue test failed - // with data inconsistency. As of now going ahead with sync putandGetKey. - // Need to work on this during cedar - // if (this.keyPutNoSync) { - // putAndGetKeyNoSync(event); - // } - // else { - // synchronized (this) { putAndGetKey(event); return true; - // } - // } } return false; } @@ -366,26 +353,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { // If we do want to support it then each caller needs // to call freeOffHeapResources and the returned GatewaySenderEventImpl throw new UnsupportedOperationException(); - // resetLastPeeked(); - // AsyncEvent object = peekAhead(); - // // If it is not null, destroy it and increment the head key - // if (object != null) { - // Long key = this.peekedIds.remove(); - // if (logger.isTraceEnabled()) { - // logger.trace("{}: Retrieved {} -> {}",this, key, object); - // } - // // Remove the entry at that key with a callback arg signifying it is - // // a WAN queue so that AbstractRegionEntry.destroy can get the value - // // even if it has been evicted to disk. In the normal case, the - // // AbstractRegionEntry.destroy only gets the value in the VM. - // this.region.destroy(key, RegionQueue.WAN_QUEUE_TOKEN); - // updateHeadKey(key.longValue()); - - // if (logger.isTraceEnabled()) { - // logger.trace("{}: Destroyed {} -> {}", this, key, object); - // } - // } - // return object; } public List<AsyncEvent> take(int batchSize) throws CacheException { @@ -393,20 +360,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { // If we do want to support it then the callers // need to call freeOffHeapResources on each returned GatewaySenderEventImpl throw new UnsupportedOperationException(); - // List<AsyncEvent> batch = new ArrayList<AsyncEvent>( - // batchSize * 2); - // for (int i = 0; i < batchSize; i++) { - // AsyncEvent obj = take(); - // if (obj != null) { - // batch.add(obj); - // } else { - // break; - // } - // } - // if (logger.isTraceEnabled()) { - // logger.trace("{}: Took a batch of {} entries", this, batch.size()); - // } - // return batch; } /** @@ -442,7 +395,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; this.lastDispatchedKey = key; if (wasEmpty) { - this.notify(); + notifyAll(); } if (logger.isDebugEnabled()) { @@ -468,7 +421,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { } public Object peek() throws CacheException { - // resetLastPeeked(); Object object = peekAhead(); if (logger.isTraceEnabled()) { logger.trace("{}: Peeked {} -> {}", this, peekedIds, object); @@ -494,7 +446,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { } List<AsyncEvent> batch = new ArrayList<AsyncEvent>(size * 2); // why // *2? - // resetLastPeeked(); while (batch.size() < size) { AsyncEvent object = peekAhead(); // Conflate here @@ -725,7 +676,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { /** * Clear the list of peeked keys. The next peek will start again at the head key. - * */ public void resetLastPeeked() { this.peekedIds.clear(); @@ -736,7 +686,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { * * @throws CacheException */ - private Long getCurrentKey() { long currentKey; if (this.peekedIds.isEmpty()) { @@ -775,7 +724,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { return null; } - // It's important here that we check where the current key // is in relation to the tail key before we check to see if the // object exists. The reason is that the tail key is basically @@ -785,7 +733,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { // If we check for the object, and then check the tail key, we could // skip objects. - // @todo don't do a get which updates the lru, instead just get the value + // TODO: don't do a get which updates the lru, instead just get the value // w/o modifying the LRU. // Note: getting the serialized form here (if it has overflowed to disk) // does not save anything since GatewayBatchOp needs to GatewayEventImpl @@ -969,7 +917,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { */ @SuppressWarnings({"unchecked", "rawtypes"}) private void initializeRegion(AbstractGatewaySender sender, CacheListener listener) { - final GemFireCacheImpl gemCache = (GemFireCacheImpl) sender.getCache(); + final InternalCache gemCache = sender.getCache(); this.region = gemCache.getRegion(this.regionName); if (this.region == null) { AttributesFactory<Long, AsyncEvent> factory = new AttributesFactory<Long, AsyncEvent>(); @@ -992,11 +940,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { factory.setEvictionAttributes(ea); factory.setConcurrencyChecksEnabled(false); - factory.setDiskStoreName(this.diskStoreName); - // TODO: Suranjan, can we do the following - // In case of persistence write to disk sync and in case of eviction - // write in async + + // In case of persistence write to disk sync and in case of eviction write in async factory.setDiskSynchronous(this.isDiskSynchronous); // Create the region @@ -1067,12 +1013,14 @@ public class SerialGatewaySenderQueue implements RegionQueue { */ private volatile boolean shutdown = false; - private final GemFireCacheImpl cache; + private final InternalCache cache; /** * Constructor : Creates and initializes the thread + * + * @param c */ - public BatchRemovalThread(GemFireCacheImpl c) { + public BatchRemovalThread(InternalCache c) { this.setDaemon(true); this.cache = c; } @@ -1213,7 +1161,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { AbstractGatewaySender sender = null; protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, - LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender sender) { + LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender) { super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) .setSnapshotInputStream(null).setImageTarget(null)
