http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 0c967c9..4bdd67d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -12,14 +12,54 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; import org.apache.geode.InvalidDeltaException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.CacheLoader; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.LossAction; +import org.apache.geode.cache.MembershipAttributes; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionAccessException; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.RegionDistributionException; +import org.apache.geode.cache.RegionMembershipListener; +import org.apache.geode.cache.ResumptionAction; +import org.apache.geode.cache.RoleException; +import org.apache.geode.cache.TimeoutException; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; @@ -31,9 +71,14 @@ import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.LockServiceDestroyedException; import org.apache.geode.distributed.Role; -import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionAdvisee; +import org.apache.geode.distributed.internal.DistributionAdvisor; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; import org.apache.geode.distributed.internal.DistributionAdvisor.ProfileVisitor; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.locks.DLockRemoteToken; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -45,9 +90,21 @@ import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus; import org.apache.geode.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; import org.apache.geode.internal.cache.control.MemoryEvent; -import org.apache.geode.internal.cache.execute.*; +import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor; +import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender; +import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultWaiter; +import org.apache.geode.internal.cache.execute.FunctionStats; +import org.apache.geode.internal.cache.execute.LocalResultCollector; +import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; +import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.lru.LRUEntry; -import org.apache.geode.internal.cache.persistence.*; +import org.apache.geode.internal.cache.partitioned.Bucket; +import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor; +import org.apache.geode.internal.cache.persistence.PersistenceAdvisor; +import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl; +import org.apache.geode.internal.cache.persistence.PersistentMemberID; +import org.apache.geode.internal.cache.persistence.PersistentMemberManager; +import org.apache.geode.internal.cache.persistence.PersistentMemberView; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; @@ -63,21 +120,7 @@ import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.sequencelog.RegionLogger; import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.io.InputStream; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -/** - * - */ @SuppressWarnings("deprecation") public class DistributedRegion extends LocalRegion implements CacheDistributionAdvisee { private static final Logger logger = LogService.getLogger(); @@ -91,17 +134,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA final CacheDistributionAdvisor distAdvisor; /** - * @guarded.By {@link #dlockMonitor} + * GuardedBy {@link #dlockMonitor} */ private DistributedLockService dlockService; - protected final AdvisorListener advisorListener = new AdvisorListener(); + final AdvisorListener advisorListener = new AdvisorListener(); /** Set of currently missing required roles */ - protected final HashSet missingRequiredRoles = new HashSet(); + final HashSet missingRequiredRoles = new HashSet(); // package-private to avoid synthetic accessor /** True if this region is currently missing any required roles */ - protected volatile boolean isMissingRequiredRoles = false; + private volatile boolean isMissingRequiredRoles = false; /** * True if this region is has any required roles defined and the LossAction is either NO_ACCESS or @@ -113,7 +156,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * Latch that is opened after initialization waits for required roles up to the * <a href="DistributedSystem#member-timeout">member-timeout </a>. */ - protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout; + private final StoppableCountDownLatch initializationLatchAfterMemberTimeout; private final PersistenceAdvisor persistenceAdvisor; @@ -134,11 +177,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA */ private final Object clearLock = new Object(); - private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false); + private static final AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false); /** Creates a new instance of DistributedRegion */ protected DistributedRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, - GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) { + InternalCache cache, InternalRegionArguments internalRegionArgs) { super(regionName, attrs, parentRegion, cache, internalRegionArgs); this.initializationLatchAfterMemberTimeout = new StoppableCountDownLatch(getCancelCriterion(), 1); @@ -196,20 +239,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA diskStats = null; } PersistentMemberManager memberManager = cache.getPersistentMemberManager(); - this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage, + this.persistenceAdvisor = new PersistenceAdvisorImpl(this.distAdvisor, dl, storage, this.getFullPath(), diskStats, memberManager); - } catch (Exception e) { + } catch (Exception ignore) { // TODO: wrap exception in throw throw new InternalGemFireError("Couldn't recover persistence"); } } else { this.persistenceAdvisor = null; } if (this.persistenceAdvisor != null) { - this.persistentId = persistenceAdvisor.generatePersistentID(); + this.persistentId = this.persistenceAdvisor.generatePersistentID(); } else { this.persistentId = null; } - } @Override @@ -225,10 +267,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA */ protected CacheDistributionAdvisor createDistributionAdvisor( InternalRegionArguments internalRegionArgs) { - return CacheDistributionAdvisor.createCacheDistributionAdvisor(this); // Warning: potential - // early escape of object - // before full - // construction + // Warning: potential early escape of object before full construction + return CacheDistributionAdvisor.createCacheDistributionAdvisor(this); } /** @@ -256,14 +296,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (!this.generateVersionTag) { return true; } - return this.concurrencyChecksEnabled && (this.srp == null) && !isTX() + return this.concurrencyChecksEnabled && (this.serverRegionProxy == null) && !isTX() && this.scope.isDistributed() && !this.dataPolicy.withReplication(); } - /** - * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, boolean, long, boolean) - */ @Override protected boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, @@ -276,8 +313,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA !event.isNetSearch() && // search and load processor handles own locking !event.isNetLoad() && // @todo darrel/kirk: what about putAll? - !event.isLocalLoad() && !event.isSingleHopPutOp()) { // Single Hop Op means dlock is already - // taken at origin node. + !event.isLocalLoad() && !event.isSingleHopPutOp()) { + // Single Hop Op means dlock is already taken at origin node. dlock = this.getDistributedLockIfGlobal(event.getKey()); } if (isTraceEnabled) { @@ -332,10 +369,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA logger.trace("DR.virtualPut: this cache has already seen this event {}", event); } - // Gester, Fix 39014: when hasSeenEvent, put will still distribute + // Fix 39014: when hasSeenEvent, put will still distribute // event, but putAll did not. We add the logic back here, not to put // back into DR.distributeUpdate() because we moved this part up into - // LR.basicPutPart3 in purpose. Reviewed by Bruce. + // LR.basicPutPart3 in purpose. if (event.isBulkOpInProgress() && !event.isOriginRemote()) { event.getPutAllOperation().addEntry(event, true); } @@ -409,7 +446,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA event.getRemoveAllOperation().addEntry(event); } else { basicDestroy(event, true, null); - // getSharedDataView().destroyExistingEntry(event, true, null); } } @@ -448,7 +484,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - protected void setGeneratedVersionTag(boolean generateVersionTag) { + void setGeneratedVersionTag(boolean generateVersionTag) { // there is at-least one other persistent member, so turn on concurrencyChecks enableConcurrencyChecks(); @@ -470,7 +506,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA || !this.generateVersionTag) { return false; } - if (this.srp != null) { // client + if (this.serverRegionProxy != null) { // client return false; } if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { @@ -513,8 +549,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA Set roles = Collections.unmodifiableSet(new HashSet(this.missingRequiredRoles)); throw new RegionAccessException( LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1 - .toLocalizedString( - new Object[] {getMembershipAttributes().getLossAction(), roles}), + .toLocalizedString(getMembershipAttributes().getLossAction(), roles), getFullPath(), roles); } } @@ -540,8 +575,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA Assert.assertTrue(!roles.isEmpty()); throw new RegionAccessException( LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1 - .toLocalizedString( - new Object[] {getMembershipAttributes().getLossAction(), roles}), + .toLocalizedString(getMembershipAttributes().getLossAction(), roles), getFullPath(), roles); } } @@ -550,30 +584,30 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA @Override protected void handleReliableDistribution(Set successfulRecipients) { - handleReliableDistribution(successfulRecipients, Collections.EMPTY_SET, Collections.EMPTY_SET); + handleReliableDistribution(successfulRecipients, Collections.emptySet(), + Collections.emptySet()); } - protected void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1, + private void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1, Set otherRecipients2) { if (this.requiresReliabilityCheck) { MembershipAttributes ra = getMembershipAttributes(); - Set recipients = successfulRecipients; // determine the successful roles Set roles = new HashSet(); - for (Iterator iter = recipients.iterator(); iter.hasNext();) { - InternalDistributedMember mbr = (InternalDistributedMember) iter.next(); + for (Object successfulRecipient : successfulRecipients) { + InternalDistributedMember mbr = (InternalDistributedMember) successfulRecipient; if (mbr != null) { roles.addAll(mbr.getRoles()); } } - for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) { - InternalDistributedMember mbr = (InternalDistributedMember) iter.next(); + for (Object anOtherRecipients1 : otherRecipients1) { + InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients1; if (mbr != null) { roles.addAll(mbr.getRoles()); } } - for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) { - InternalDistributedMember mbr = (InternalDistributedMember) iter.next(); + for (Object anOtherRecipients2 : otherRecipients2) { + InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients2; if (mbr != null) { roles.addAll(mbr.getRoles()); } @@ -581,22 +615,18 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // determine the missing roles Set failedRoles = new HashSet(ra.getRequiredRoles()); failedRoles.removeAll(roles); - if (failedRoles.isEmpty()) + if (failedRoles.isEmpty()) { return; - // if (rp.isAllAccessWithQueuing()) { - // this.rmq.add(data, failedRoles); - // } else { + } throw new RegionDistributionException( LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0 .toLocalizedString(failedRoles), getFullPath(), failedRoles); - // } } } /** - * * Called when we do a distributed operation and don't have anyone to distributed it too. Since * this is only called when no distribution was done (i.e. no recipients) we do not check * isMissingRequiredRoles because it might not longer be true due to race conditions @@ -607,18 +637,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * LIMITED_ACCESS. * @since GemFire 5.0 */ - protected boolean isNoDistributionOk() { + boolean isNoDistributionOk() { if (this.requiresReliabilityCheck) { MembershipAttributes ra = getMembershipAttributes(); - // if (ra.getLossAction().isAllAccessWithQueuing()) { - // return !ra.hasRequiredRoles(); - // } else { - Set failedRoles = ra.getRequiredRoles(); + Set<Role> failedRoles = ra.getRequiredRoles(); throw new RegionDistributionException( LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0 .toLocalizedString(failedRoles), getFullPath(), failedRoles); - // } } return true; } @@ -633,76 +659,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return false; } - @Override public boolean shouldSyncForCrashedMember(InternalDistributedMember id) { return !doesNotDistribute() && super.shouldSyncForCrashedMember(id); } - - /** - * Adjust the specified set of recipients by removing any of them that are currently having their - * data queued. - * - * @param recipients the set of recipients that a message is to be distributed too. Recipients - * that are currently having their data queued will be removed from this set. - * @return the set, possibly null, of recipients that are currently having their data queued. - * @since GemFire 5.0 - */ - protected Set adjustForQueuing(Set recipients) { - Set result = null; - // if (this.requiresReliabilityCheck) { - // MembershipAttributes ra = getMembershipAttributes(); - // if (ra.getLossAction().isAllAccessWithQueuing()) { - // Set currentQueuedRoles = this.rmq.getQueuingRoles(); - // if (currentQueuedRoles != null) { - // // foreach recipient see if any of his roles are queued and if - // // they are remove him from recipients and add him to result - // Iterator it = recipients.iterator(); - // while (it.hasNext()) { - // DistributedMember dm = (DistributedMember)it.next(); - // Set dmRoles = dm.getRoles(); - // if (!dmRoles.isEmpty()) { - // if (intersects(dmRoles, currentQueuedRoles)) { - // it.remove(); // fix for bug 34447 - // if (result == null) { - // result = new HashSet(); - // } - // result.add(dm); - // } - // } - // } - // } - // } - // } - return result; - } - - /** - * Returns true if the two sets intersect - * - * @param a a non-null non-empty set - * @param b a non-null non-empty set - * @return true if sets a and b intersect; false if not - * @since GemFire 5.0 - */ - public static boolean intersects(Set a, Set b) { - Iterator it; - Set target; - if (a.size() <= b.size()) { - it = a.iterator(); - target = b; - } else { - it = b.iterator(); - target = a; - } - while (it.hasNext()) { - if (target.contains(it.next())) - return true; - } - return false; - } - @Override public boolean requiresReliabilityCheck() { return this.requiresReliabilityCheck; @@ -736,7 +697,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * * @return true if asynchronous resumption is triggered */ - protected boolean resumeReliability(InternalDistributedMember id, Set newlyAcquiredRoles) { + private boolean resumeReliability(InternalDistributedMember id, Set newlyAcquiredRoles) { boolean async = false; try { ResumptionAction ra = getMembershipAttributes().getResumptionAction(); @@ -763,6 +724,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA final Set newlyAcquiredRoles) throws RejectedExecutionException { final ResumptionAction ra = getMembershipAttributes().getResumptionAction(); getDistributionManager().getWaitingThreadPool().execute(new Runnable() { + @Override public void run() { try { if (ra.isReinitialize()) { @@ -834,8 +796,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * * @return true if asynchronous resumption is triggered */ - protected boolean lostReliability(final InternalDistributedMember id, - final Set newlyMissingRoles) { + private boolean lostReliability(final InternalDistributedMember id, final Set newlyMissingRoles) { if (DistributedRegion.ignoreReconnect) { // test hook return false; } @@ -844,7 +805,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA try { if (getMembershipAttributes().getLossAction().isReconnect()) { async = true; - if (isInitializingThread) { + if (this.isInitializingThread) { doLostReliability(true, id, newlyMissingRoles); } else { doLostReliability(false, id, newlyMissingRoles); @@ -852,18 +813,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // we don't do this in the waiting pool because we're going to // disconnect // the distributed system, and it will wait for the pool to empty - /* - * moved to a new method called doLostReliablity. Thread t = new - * Thread("Reconnect Distributed System") { public void run() { try { // TODO: may need to - * check isReconnecting and checkReadiness... initializationLatchAfterMemberTimeout.await(); - * // TODO: call reconnect here getSystem().tryReconnect((GemFireCache)getCache()); // added - * for reconnect. synchronized (missingRequiredRoles) { // any number of threads may be - * waiting on missingRequiredRoles missingRequiredRoles.notifyAll(); // need to fire an - * event if id is not null if (hasListener() && id != null) { RoleEventImpl relEvent = new - * RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT, null, true, id, - * newlyMissingRoles); dispatchListenerEvent( EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); - * } } } catch (Exception e) { } } }; t.setDaemon(true); t.start(); - */ } } catch (CancelException cce) { throw cce; @@ -879,7 +828,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA try { if (!isInitializing) { // moved code to a new thread. - Thread t = new Thread( + Thread thread = new Thread( LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) { @Override public void run() { @@ -907,15 +856,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } }; - t.setDaemon(true); - t.start(); + thread.setDaemon(true); + thread.start(); } else { getSystem().tryReconnect(false, "Role Loss", getCache()); // added for // reconnect. - synchronized (missingRequiredRoles) { + synchronized (this.missingRequiredRoles) { // any number of threads may be waiting on missingRequiredRoles - missingRequiredRoles.notifyAll(); + this.missingRequiredRoles.notifyAll(); // need to fire an event if id is not null if (hasListener() && id != null) { RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this, @@ -923,10 +872,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); } } - // } catch (CancelException cce){ - - // } - } } catch (CancelException ignor) { throw ignor; @@ -934,12 +879,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e); } - } - protected void lockCheckReadiness() { + void lockCheckReadiness() { // package-private to avoid synthetic accessor // fix for bug 32610 - cache.getCancelCriterion().checkCancelInProgress(null); + this.cache.getCancelCriterion().checkCancelInProgress(null); checkReadiness(); } @@ -956,9 +900,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /** - * @see LocalRegion#localDestroyNoCallbacks(Object) - */ @Override public void localDestroyNoCallbacks(Object key) { super.localDestroyNoCallbacks(key); @@ -970,9 +911,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /** - * @see LocalRegion#localDestroy(Object, Object) - */ @Override public void localDestroy(Object key, Object aCallbackArgument) throws EntryNotFoundException { super.localDestroy(key, aCallbackArgument); @@ -984,9 +922,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /** - * @see LocalRegion#invalidate(Object, Object) - */ @Override public void invalidate(Object key, Object aCallbackArgument) throws TimeoutException, EntryNotFoundException { @@ -996,7 +931,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA checkForLimitedOrNoAccess(); Lock dlock = this.getDistributedLockIfGlobal(key); try { - super.validatedInvalidate(key, aCallbackArgument); + validatedInvalidate(key, aCallbackArgument); } finally { if (dlock != null) dlock.unlock(); @@ -1037,8 +972,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * Called while NOT holding lock on parent's subregions * * @throws IllegalStateException if region is not compatible with a region in another VM. - * - * @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments) */ @Override protected void initialize(InputStream snapshotInputStream, InternalDistributedMember imageTarget, @@ -1060,7 +993,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA try { try { - PersistentMemberID persistentId = null; + PersistentMemberID persistentMemberId = null; boolean recoverFromDisk = isRecoveryNeeded(); DiskRegion dskRgn = getDiskRegion(); if (recoverFromDisk) { @@ -1071,7 +1004,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (logger.isDebugEnabled()) { logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery"); } - persistentId = dskRgn.getMyPersistentID(); + persistentMemberId = dskRgn.getMyPersistentID(); } // Create OQL indexes before starting GII. @@ -1079,7 +1012,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (getDataPolicy().withReplication() || getDataPolicy().withPreloaded()) { getInitialImageAndRecovery(snapshotInputStream, imageTarget, internalRegionArgs, - recoverFromDisk, persistentId); + recoverFromDisk, persistentMemberId); } else { new CreateRegionProcessor(this).initializeRegion(); if (snapshotInputStream != null) { @@ -1093,9 +1026,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } initMembershipRoles(); - isInitializingThread = false; - super.initialize(null, null, null); // makes sure all latches are released if they haven't - // been already + this.isInitializingThread = false; + // makes sure all latches are released if they haven't been already + super.initialize(null, null, null); } finally { if (this.eventTracker != null) { this.eventTracker.setInitialized(); @@ -1114,40 +1047,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** * A reference counter to protected the memoryThresholdReached boolean */ - private final Set<DistributedMember> memoryThresholdReachedMembers = - new CopyOnWriteArraySet<DistributedMember>(); - - /** Sets and returns giiMissingRequiredRoles */ - private boolean checkInitialImageForReliability(InternalDistributedMember imageTarget, - CacheDistributionAdvisor.InitialImageAdvice advice) { - // assumption: required roles are interesting to GII only if Reinitialize... - // if (true) - return false; - // if (getMembershipAttributes().hasRequiredRoles() - // && getMembershipAttributes().getResumptionAction().isReinitialize()) { - // // are any required roles missing for GII with Reinitialize? - // Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles()); - // missingRR.removeAll(getSystem().getDistributedMember().getRoles()); - // for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) { - // DistributedMember member = (DistributedMember)iter.next(); - // missingRR.removeAll(member.getRoles()); - // } - // for (Iterator iter = advice.others.iterator(); iter.hasNext();) { - // DistributedMember member = (DistributedMember)iter.next(); - // missingRR.removeAll(member.getRoles()); - // } - // for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) { - // DistributedMember member = (DistributedMember)iter.next(); - // missingRR.removeAll(member.getRoles()); - // } - // if (!missingRR.isEmpty()) { - // // entering immediate loss condition, which will cause reinit on resume - // this.giiMissingRequiredRoles = true; - // } - // } - // return this.giiMissingRequiredRoles; - } + private final Set<DistributedMember> memoryThresholdReachedMembers = new CopyOnWriteArraySet<>(); + // TODO: cleanup getInitialImageAndRecovery private void getInitialImageAndRecovery(InputStream snapshotInputStream, InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs, boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException { @@ -1158,17 +1060,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA imgState.init(); boolean targetRecreated = internalRegionArgs.getRecreateFlag(); Boolean isCBool = (Boolean) isConversion.get(); - boolean isForConversion = isCBool != null ? isCBool.booleanValue() : false; + boolean isForConversion = isCBool != null ? isCBool : false; if (recoverFromDisk && snapshotInputStream != null && !isForConversion) { throw new InternalGemFireError( LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1 - .toLocalizedString( - new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream})); + .toLocalizedString(new Object[] {true, snapshotInputStream})); } ProfileExchangeProcessor targetProvider; - if (dataPolicy.withPersistence()) { + if (this.dataPolicy.withPersistence()) { targetProvider = new CreatePersistentRegionProcessor(this, getPersistenceAdvisor(), recoverFromDisk); } else { @@ -1177,15 +1078,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } imgState.setInRecovery(false); RegionVersionVector recovered_rvv = null; - if (dataPolicy.withPersistence()) { - recovered_rvv = (this.getVersionVector() == null ? null - : this.getVersionVector().getCloneForTransmission()); + if (this.dataPolicy.withPersistence()) { + recovered_rvv = this.getVersionVector() == null ? null + : this.getVersionVector().getCloneForTransmission(); } // initializeRegion will send out our profile targetProvider.initializeRegion(); - if (persistenceAdvisor != null) { - persistenceAdvisor.initialize(); + if (this.persistenceAdvisor != null) { + this.persistenceAdvisor.initialize(); } // Register listener here so that the remote members are known @@ -1193,7 +1094,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // remote members if (!isInternalRegion()) { if (!this.isDestroyed) { - cache.getInternalResourceManager().addResourceListener(ResourceType.MEMORY, this); + this.cache.getInternalResourceManager().addResourceListener(ResourceType.MEMORY, this); } } @@ -1212,9 +1113,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } loadSnapshotDuringInitialization(snapshotInputStream); } catch (IOException e) { - throw new RuntimeException(e); // @todo change this exception? + throw new RuntimeException(e); // TODO: change this exception? } catch (ClassNotFoundException e) { - throw new RuntimeException(e); // @todo change this exception? + throw new RuntimeException(e); // TODO: change this exception? } cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII); return; @@ -1227,25 +1128,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // or not) InitialImageOperation iiop = new InitialImageOperation(this, this.entries); - // [defunct] Special case GII for PR admin regions (which are always - // replicates and always writers - // bruce: this was commented out after adding the GIIAckRequest logic to - // force - // consistency before the gii operation begins - // if (isUsedForPartitionedRegionAdmin() || - // isUsedForPartitionedRegionBucket()) { - // releaseBeforeGetInitialImageLatch(); - // iiop.getFromAll(this.distAdvisor.adviseGeneric(), false); - // cleanUpDestroyedTokens(); - // return; - // } - CacheDistributionAdvisor.InitialImageAdvice advice = null; boolean done = false; while (!done && !isDestroyed()) { advice = targetProvider.getInitialImageAdvice(advice); - checkInitialImageForReliability(imageSrc, advice); boolean attemptGetFromOne = imageSrc != null // we were given a specific member || this.dataPolicy.withPreloaded() && !advice.preloaded.isEmpty() // this is a preloaded // region @@ -1331,12 +1218,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /* - * no more union GII // do union getInitialImage Set rest = new HashSet(); - * rest.addAll(advice.others); rest.addAll(advice.preloaded); // push profile w/ recovery - * flag turned off at same time that we // do a union getInitialImage boolean pushProfile - * = recoverFromDisk; iiop.getFromAll(rest, pushProfile); - */ cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII); done = true; return; @@ -1344,13 +1225,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA break; } } - - return; - } - - private void synchronizeWith(InternalDistributedMember target, VersionSource idToRecover) { - InitialImageOperation op = new InitialImageOperation(this, this.entries); - op.synchronizeWith(target, idToRecover, null); } /** @@ -1359,7 +1233,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA */ public void synchronizeForLostMember(InternalDistributedMember lostMember, VersionSource lostVersionID) { - if (this.concurrencyChecksEnabled == false) { + if (!this.concurrencyChecksEnabled) { return; } CacheDistributionAdvisor advisor = getCacheDistributionAdvisor(); @@ -1379,11 +1253,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA op.synchronizeWith(target, versionMember, lostMember); } - /** - * invoked just before an initial image is requested from another member - */ /** remove any partial entries received in a failed GII */ - protected void cleanUpAfterFailedGII(boolean recoverFromDisk) { + void cleanUpAfterFailedGII(boolean recoverFromDisk) { DiskRegion dskRgn = getDiskRegion(); // if we have a persistent region, instead of deleting everything on disk, // we will just reset the "recovered from disk" flag. After @@ -1428,8 +1299,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // remove all the roles we are playing since they will never be // missing this.missingRequiredRoles.removeAll(getSystem().getDistributedMember().getRoles()); - for (Iterator iter = others.iterator(); iter.hasNext();) { - DistributedMember other = (DistributedMember) iter.next(); + for (Object other1 : others) { + DistributedMember other = (DistributedMember) other1; this.missingRequiredRoles.removeAll(other.getRoles()); } } @@ -1445,7 +1316,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA try { if (this.giiMissingRequiredRoles) { // force reliability loss and possibly resumption - isInitializingThread = true; + this.isInitializingThread = true; synchronized (this.advisorListener) { synchronized (this.missingRequiredRoles) { // forcing state of loss because of bad GII @@ -1462,7 +1333,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA logger.debug("GetInitialImage had missing required roles."); } // TODO: will this work with RECONNECT and REINITIALIZE? - isInitializingThread = true; + this.isInitializingThread = true; lostReliability(null, null); if (this.missingRequiredRoles.isEmpty()) { // all required roles are present so force resumption @@ -1477,7 +1348,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // pur code to increment the stats. boolean async = resumeReliability(null, null); if (async) { - advisorListener.destroyed = true; + this.advisorListener.destroyed = true; } } } @@ -1508,7 +1379,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA logger.debug("Initialization completed with missing required roles: {}", this.missingRequiredRoles); } - isInitializingThread = true; + this.isInitializingThread = true; lostReliability(null, null); } } @@ -1516,10 +1387,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } catch (RegionDestroyedException ignore) { // ignore to fix bug 34639 may be thrown by waitForRequiredRoles - } catch (CancelException ignore) { + } catch (CancelException e) { // ignore to fix bug 34639 may be thrown by waitForRequiredRoles - if (isInitializingThread) { - throw ignore; + if (this.isInitializingThread) { + throw e; } } catch (Exception e) { logger.fatal( @@ -1538,7 +1409,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // called by InitialImageOperation to clean up destroyed tokens // release afterGetInitialImageInitializationLatch before unlocking // cleanUpLock - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UL_UNRELEASED_LOCK") + @SuppressWarnings("UL_UNRELEASED_LOCK") protected void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus) { // We need to clean up the disk before we release the after get initial image latch DiskRegion dskRgn = getDiskRegion(); @@ -1564,8 +1435,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA clearEntries(rvv); } // need to do this before we release the afterGetInitialImageLatch - if (persistenceAdvisor != null) { - persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID()); + if (this.persistenceAdvisor != null) { + this.persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID()); } } finally { // release after gii lock first so basicDestroy will see isInitialized() @@ -1588,14 +1459,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName())); } - /** - * @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object) - */ @Override protected void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException, CacheWriterException, TimeoutException { // disallow local destruction for mirrored keysvalues regions - boolean invokeWriter = cacheWrite; boolean hasSeen = false; if (hasSeenEvent(event)) { hasSeen = true; @@ -1603,11 +1470,12 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA checkIfReplicatedAndLocalDestroy(event); try { + boolean invokeWriter = cacheWrite; if (this.requiresOneHopForMissingEntry(event)) { // bug #45704: see if a one-hop must be done for this operation RegionEntry re = getRegionEntry(event.getKey()); if (re == null /* || re.isTombstone() */ || !this.generateVersionTag) { - if (this.srp == null) { + if (this.serverRegionProxy == null) { // only assert for non-client regions. Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag); } @@ -1651,7 +1519,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - return; } finally { if (hasSeen) { if (event.isBulkOpInProgress() && !event.isOriginRemote()) { @@ -1699,10 +1566,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return evictDestroyWasDone; } - - /** - * @see LocalRegion#basicInvalidateRegion(RegionEventImpl) - */ @Override void basicInvalidateRegion(RegionEventImpl event) { // disallow local invalidation for replicated regions @@ -1721,7 +1584,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * decide if InvalidateRegionOperation should be sent to peers. broken out so that BucketRegion * can override * - * @param event * @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise */ protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) { @@ -1738,9 +1600,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA new InvalidateRegionOperation(event).distribute(); } - /** - * @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean, boolean) - */ @Override void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite, boolean lock, boolean callbackEvents) throws CacheWriterException, TimeoutException { @@ -1749,7 +1608,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // when another member concurrently creates this region. See bug 42051. boolean isClose = event.getOperation().isClose(); if (!isClose) { - cache.beginDestroy(path, this); + this.cache.beginDestroy(path, this); } try { super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents); @@ -1772,17 +1631,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } finally { if (!isClose) { - cache.endDestroy(path, this); + this.cache.endDestroy(path, this); } RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose); } } - @Override protected void distributeDestroyRegion(RegionEventImpl event, boolean notifyOfRegionDeparture) { - if (persistenceAdvisor != null) { - persistenceAdvisor.releaseTieLock(); + if (this.persistenceAdvisor != null) { + this.persistenceAdvisor.releaseTieLock(); } new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute(); } @@ -1790,16 +1648,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** * Return true if invalidation occurred; false if it did not, for example if it was already * invalidated - * - * @see LocalRegion#basicInvalidate(EntryEventImpl) */ @Override void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException { - boolean hasSeen = false; if (hasSeenEvent(event)) { hasSeen = true; } + try { // disallow local invalidation for replicated regions if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed() @@ -1812,7 +1668,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // bug #45704: see if a one-hop must be done for this operation RegionEntry re = getRegionEntry(event.getKey()); if (re == null/* || re.isTombstone() */ || !this.generateVersionTag) { - if (this.srp == null) { + if (this.serverRegionProxy == null) { // only assert for non-client regions. Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag); } @@ -1838,7 +1694,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA super.basicInvalidate(event); - return; } finally { if (hasSeen) { if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { @@ -1871,26 +1726,25 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA @Override void basicUpdateEntryVersion(EntryEventImpl event) throws EntryNotFoundException { - LocalRegion lr = event.getLocalRegion(); - AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap()); + LocalRegion localRegion = event.getLocalRegion(); + AbstractRegionMap regionMap = (AbstractRegionMap) localRegion.getRegionMap(); try { - arm.lockForCacheModification(lr, event); + regionMap.lockForCacheModification(localRegion, event); try { if (!hasSeenEvent(event)) { super.basicUpdateEntryVersion(event); } - return; } finally { if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { distributeUpdateEntryVersion(event); } } } finally { - arm.releaseCacheModificationLock(lr, event); + regionMap.releaseCacheModificationLock(localRegion, event); } } - protected void distributeUpdateEntryVersion(EntryEventImpl event) { + void distributeUpdateEntryVersion(EntryEventImpl event) { if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote() && !isTX() /* only distribute if non-tx */) { if (event.isDistributed() && !event.isOriginRemote()) { @@ -1902,10 +1756,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } @Override - protected void basicClear(RegionEventImpl ev) { + protected void basicClear(RegionEventImpl regionEvent) { Lock dlock = this.getRegionDistributedLockIfGlobal(); try { - super.basicClear(ev); + super.basicClear(regionEvent); } finally { if (dlock != null) dlock.unlock(); @@ -1919,7 +1773,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA do { // non-replicate regions must defer to a replicate for clear/invalidate of region Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates(); - if (repls.size() > 0) { + if (!repls.isEmpty()) { InternalDistributedMember mbr = repls.iterator().next(); RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this); try { @@ -1949,7 +1803,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // to suspend locking, which is what distributedLockForClear() does. We don't // want that to happen, so we'll synchronize to make sure only one thread on // this member performs a clear. - synchronized (clearLock) { + synchronized (this.clearLock) { if (enableRVV) { distributedLockForClear(); @@ -2016,9 +1870,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** * obtain locks preventing generation of new versions in other members - * - * @param participants - **/ + */ private void obtainWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); @@ -2029,12 +1881,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * pause local operations so that a clear() can be performed and flush comm channels to the given * member */ - public void lockLocallyForClear(DM dm, InternalDistributedMember locker, CacheEvent event) { + void lockLocallyForClear(DM dm, InternalDistributedMember locker, CacheEvent event) { RegionVersionVector rvv = getVersionVector(); - ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); - if (alth != null) - alth.beforeLock(this, event); + ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook(); + if (armLockTestHook != null) { + armLockTestHook.beforeLock(this, event); + } if (rvv != null) { // block new operations from being applied to the region map @@ -2044,46 +1897,44 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA checkReadiness(); // Only need to flush if NOACK at this point if (this.getAttributes().getScope().isDistributedNoAck()) { - Set<InternalDistributedMember> mbrs = getDistributionAdvisor().adviseCacheOp(); - StateFlushOperation.flushTo(mbrs, this); + Set<InternalDistributedMember> members = getDistributionAdvisor().adviseCacheOp(); + StateFlushOperation.flushTo(members, this); } } - if (alth != null) - alth.afterLock(this, null); - + if (armLockTestHook != null) { + armLockTestHook.afterLock(this, null); + } } /** * releases the locks obtained in obtainWriteLocksForClear - * - * @param participants */ private void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { - ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); - if (alth != null) - alth.beforeRelease(this, regionEvent); + ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook(); + if (armLockTestHook != null) { + armLockTestHook.beforeRelease(this, regionEvent); + } getVersionVector().unlockForClear(getMyId()); DistributedClearOperation.releaseLocks(regionEvent, participants); - if (alth != null) - alth.afterRelease(this, regionEvent); - + if (armLockTestHook != null) { + armLockTestHook.afterRelease(this, regionEvent); + } } /** * Wait for in progress clears that were initiated by this member. */ private void waitForInProgressClear() { - RegionVersionVector rvv = getVersionVector(); if (rvv != null) { - synchronized (clearLock) { + synchronized (this.clearLock) { // do nothing; - // DAN - I'm a little scared that the compiler might optimize + // I'm a little scared that the compiler might optimize // away this synchronization if we really do nothing. Hence // my fine log message below. This might not be necessary. if (logger.isDebugEnabled()) { @@ -2107,12 +1958,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return eventId; } - // test hook for DistributedAckRegionCCEDUnitTest - public static boolean LOCALCLEAR_TESTHOOK; - @Override void basicLocalClear(RegionEventImpl rEvent) { - if (getScope().isDistributed() && getDataPolicy().withReplication() && !LOCALCLEAR_TESTHOOK) { + if (getScope().isDistributed() && getDataPolicy().withReplication()) { throw new UnsupportedOperationException( LocalizedStrings.DistributedRegion_LOCALCLEAR_IS_NOT_SUPPORTED_ON_DISTRIBUTED_REPLICATED_REGIONS .toLocalizedString()); @@ -2124,64 +1972,63 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return getSystem().getDistributionManager().getConfig(); } - /* - * @see SearchLoadAndWriteProcessor#initialize(LocalRegion, Object, Object) - */ - public final CacheDistributionAdvisor getDistributionAdvisor() { + @Override + public CacheDistributionAdvisor getDistributionAdvisor() { return this.distAdvisor; } + @Override public CacheDistributionAdvisor getCacheDistributionAdvisor() { return this.distAdvisor; } - public final PersistenceAdvisor getPersistenceAdvisor() { + public PersistenceAdvisor getPersistenceAdvisor() { return this.persistenceAdvisor; } - public final PersistentMemberID getPersistentID() { + public PersistentMemberID getPersistentID() { return this.persistentId; } /** Returns the distribution profile; lazily creates one if needed */ + @Override public Profile getProfile() { return this.distAdvisor.createProfile(); } - public void fillInProfile(Profile p) { - assert p instanceof CacheProfile; - CacheProfile profile = (CacheProfile) p; - profile.dataPolicy = getDataPolicy(); - profile.hasCacheLoader = basicGetLoader() != null; - profile.hasCacheWriter = basicGetWriter() != null; - profile.hasCacheListener = hasListener(); + @Override + public void fillInProfile(Profile profile) { + assert profile instanceof CacheProfile; + CacheProfile cacheProfile = (CacheProfile) profile; + cacheProfile.dataPolicy = getDataPolicy(); + cacheProfile.hasCacheLoader = basicGetLoader() != null; + cacheProfile.hasCacheWriter = basicGetWriter() != null; + cacheProfile.hasCacheListener = hasListener(); Assert.assertTrue(this.scope.isDistributed()); - profile.scope = this.scope; - profile.inRecovery = getImageState().getInRecovery(); - profile.isPersistent = getDataPolicy().withPersistence(); - profile.setSubscriptionAttributes(getSubscriptionAttributes()); - // Kishor : Below PDX check is added for rolling upgrade support. We are + cacheProfile.scope = this.scope; + cacheProfile.inRecovery = getImageState().getInRecovery(); + cacheProfile.isPersistent = getDataPolicy().withPersistence(); + cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes()); + + // Below PDX check is added for rolling upgrade support. We are // removing Old wan in this checkin. PDX region are always gatewayEnabled // irrespective whether gatewayHub is configured or not. // Old version Pdx region always has this attribute true so to avoid region // attribute comparison across member we are setting it to true. - if (this.isPdxTypesRegion()) { - profile.isGatewayEnabled = true; - } else { - profile.isGatewayEnabled = false; - } - profile.serialNumber = getSerialNumber(); - profile.regionInitialized = this.isInitialized(); - profile.persistentID = getPersistentID(); + + cacheProfile.isGatewayEnabled = isPdxTypesRegion(); + cacheProfile.serialNumber = getSerialNumber(); + cacheProfile.regionInitialized = isInitialized(); + cacheProfile.persistentID = getPersistentID(); if (getPersistenceAdvisor() != null) { - profile.persistenceInitialized = getPersistenceAdvisor().isOnline(); + cacheProfile.persistenceInitialized = getPersistenceAdvisor().isOnline(); } - profile.hasCacheServer = ((this.cache.getCacheServers().size() > 0) ? true : false); - profile.requiresOldValueInEvents = this.dataPolicy.withReplication() + cacheProfile.hasCacheServer = this.cache.getCacheServers().size() > 0 ? true : false; + cacheProfile.requiresOldValueInEvents = this.dataPolicy.withReplication() && this.filterProfile != null && this.filterProfile.hasCQs(); - profile.gatewaySenderIds = getGatewaySenderIds(); - profile.asyncEventQueueIds = getVisibleAsyncEventQueueIds(); - profile.isOffHeap = getOffHeap(); + cacheProfile.gatewaySenderIds = getGatewaySenderIds(); + cacheProfile.asyncEventQueueIds = getVisibleAsyncEventQueueIds(); + cacheProfile.isOffHeap = getOffHeap(); } /** @@ -2190,25 +2037,20 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA */ public DistributedLockService getLockService() { synchronized (this.dlockMonitor) { - // Assert.assertTrue(this.scope.isGlobal()); since 7.0 this is used for distributing clear() - // ops - - String svcName = getFullPath(); + String dlsName = getFullPath(); if (this.dlockService == null) { - this.dlockService = DistributedLockService.getServiceNamed(svcName); + this.dlockService = DistributedLockService.getServiceNamed(dlsName); if (this.dlockService == null) { - this.dlockService = DLockService.create(getFullPath(), getSystem(), - true /* distributed */, false /* destroyOnDisconnect */, // region destroy will - // destroy dls - false /* automateFreeResources */); // manual freeResources only + // region destroy will destroy dls and manual freeResources only + this.dlockService = DLockService.create(getFullPath(), getSystem(), true, false, false); } // handle is-lock-grantor region attribute... if (this.isLockGrantor) { this.dlockService.becomeLockGrantor(); } if (logger.isDebugEnabled()) { - logger.debug("LockService for {} is using LockLease={}, LockTimeout=", svcName, + logger.debug("LockService for {} is using LockLease={}, LockTimeout={}", dlsName, getCache().getLockLease(), getCache().getLockTimeout()); } } @@ -2216,21 +2058,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /** - * @see LocalRegion#isCurrentlyLockGrantor() - */ @Override protected boolean isCurrentlyLockGrantor() { - if (!this.scope.isGlobal()) - return false; - return getLockService().isLockGrantor(); + return this.scope.isGlobal() && getLockService().isLockGrantor(); } @Override public boolean isLockGrantor() { - if (!this.scope.isGlobal()) - return false; - return this.isLockGrantor; + return this.scope.isGlobal() && this.isLockGrantor; } @Override @@ -2261,13 +2096,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** @return the deserialized value */ @Override @Retained - protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface txState, + protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws CacheLoaderException, TimeoutException { + checkForLimitedOrNoAccess(); - RegionEntry re = null; final Object key = keyInfo.getKey(); final Object aCallbackArgument = keyInfo.getCallbackArg(); Operation op; @@ -2276,30 +2111,26 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } else { op = Operation.UPDATE; } - long lastModified = 0L; - boolean fromServer = false; @Released EntryEventImpl event = null; - @Retained - Object result = null; try { - { - if (this.srp != null) { - VersionTagHolder holder = new VersionTagHolder(); - Object value = this.srp.get(key, aCallbackArgument, holder); - fromServer = value != null; - if (fromServer) { - event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(), - generateCallbacks); - event.setVersionTag(holder.getVersionTag()); - event.setFromServer(fromServer); // fix for bug 39358 - if (clientEvent != null && clientEvent.getVersionTag() == null) { - clientEvent.setVersionTag(holder.getVersionTag()); - } + boolean fromServer = false; + if (this.serverRegionProxy != null) { + VersionTagHolder holder = new VersionTagHolder(); + Object value = this.serverRegionProxy.get(key, aCallbackArgument, holder); + fromServer = value != null; + if (fromServer) { + event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(), + generateCallbacks); + event.setVersionTag(holder.getVersionTag()); + event.setFromServer(true); // fix for bug 39358 + if (clientEvent != null && clientEvent.getVersionTag() == null) { + clientEvent.setVersionTag(holder.getVersionTag()); } } } + long lastModified = 0L; if (!fromServer) { // Do not generate Event ID event = EntryEventImpl.create(this, op, key, null /* newValue */, aCallbackArgument, false, @@ -2315,7 +2146,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA try { processor.initialize(this, key, aCallbackArgument); // processor fills in event - processor.doSearchAndLoad(event, txState, localValue); + processor.doSearchAndLoad(event, tx, localValue); if (clientEvent != null && clientEvent.getVersionTag() == null) { clientEvent.setVersionTag(event.getVersionTag()); } @@ -2325,15 +2156,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } else { if (logger.isDebugEnabled()) { - logger.debug("DistributedRegion.findObjectInSystem skipping loader for region=" - + getFullPath() + "; key=" + key); + logger.debug( + "DistributedRegion.findObjectInSystem skipping loader for region={}; key={}", + getFullPath(), key); } } } + RegionEntry re = null; if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) { try { // Set eventId. Required for interested clients. - event.setNewEventId(cache.getDistributedSystem()); + event.setNewEventId(this.cache.getDistributedSystem()); long startPut = CachePerfStats.getStatTime(); validateKey(key); @@ -2345,17 +2178,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // set the tail key so that the event is passed to GatewaySender queues. // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue if (this instanceof BucketRegion) { - if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled()) + if (((Bucket) this).getPartitionedRegion().isParallelWanEnabled()) ((BucketRegion) this).handleWANEvent(event); } re = basicPutEntry(event, lastModified); - } catch (ConcurrentCacheModificationException e) { + } catch (ConcurrentCacheModificationException ignore) { // the cache was modified while we were searching for this entry and // the netsearch result was elided. Return the current value from the cache re = getRegionEntry(key); if (re != null) { - event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap to - // setNewValue, decrc + // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc + event.setNewValue(re.getValue(this)); } } if (!isTX()) { @@ -2371,6 +2204,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA recordMiss(re, key); } + @Retained + Object result; if (preferCD) { result = event.getRawNewValueAsHeapObject(); } else { @@ -2385,17 +2220,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } /** - * hook for subclasses to note that a cache load was performed - * - * @see BucketRegion#performedLoad - */ - // void performedLoad(EntryEventImpl event, long lastModifiedTime, TXState txState) - // throws CacheWriterException { - // // no action in DistributedRegion - // } - - /** - * @see LocalRegion#cacheWriteBeforeDestroy(EntryEventImpl, Object) * @return true if cacheWrite was performed */ @Override @@ -2430,9 +2254,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return result; } - /** - * @see LocalRegion#cacheWriteBeforeRegionDestroy(RegionEventImpl) - */ @Override boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event) throws CacheWriterException, TimeoutException { @@ -2441,7 +2262,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA CacheWriter localWriter = basicGetWriter(); Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null; - if (localWriter != null || (netWriteRecipients != null && !netWriteRecipients.isEmpty())) { + if (localWriter != null || netWriteRecipients != null && !netWriteRecipients.isEmpty()) { final long start = getCachePerfStats().startCacheWriterCall(); try { SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); @@ -2473,16 +2294,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - if (persistenceAdvisor != null) { + if (this.persistenceAdvisor != null) { this.persistenceAdvisor.close(); // fix for bug 41094 } this.distAdvisor.close(); - DLockService dls = null; // Fix for bug 46338. Wait for in progress clears before destroying the // lock service, because destroying the service immediately releases the dlock waitForInProgressClear(); + DLockService dls = null; synchronized (this.dlockMonitor) { if (this.dlockService != null) { dls = (DLockService) this.dlockService; @@ -2533,13 +2354,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA Set others = this.advisorListener.getInitialMembers(); CacheListener[] listeners = fetchCacheListenersField(); if (listeners != null) { - for (int i = 0; i < listeners.length; i++) { - if (listeners[i] instanceof RegionMembershipListener) { - RegionMembershipListener rml = (RegionMembershipListener) listeners[i]; + for (CacheListener listener : listeners) { + if (listener instanceof RegionMembershipListener) { + RegionMembershipListener regionMembershipListener = (RegionMembershipListener) listener; try { DistributedMember[] otherDms = new DistributedMember[others.size()]; others.toArray(otherDms); - rml.initialMembers(this, otherDms); + regionMembershipListener.initialMembers(this, otherDms); } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); // If this ever returns, rethrow the error. We're poisoned @@ -2562,7 +2383,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } Set<String> allGatewaySenderIds = getAllGatewaySenderIds(); if (!allGatewaySenderIds.isEmpty()) { - for (GatewaySender sender : cache.getAllGatewaySenders()) { + for (GatewaySender sender : this.cache.getAllGatewaySenders()) { if (sender.isParallel() && allGatewaySenderIds.contains(sender.getId())) { // Fix for Bug#51491. Once decided to support this configuration we have call // addShadowPartitionedRegionForUserRR @@ -2576,13 +2397,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA throw new GatewaySenderConfigurationException( LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1 .toLocalizedString(new Object[] {sender.getId(), this.getFullPath()})); - - // if (sender.isRunning()) { - // ConcurrentParallelGatewaySenderQueue parallelQueue = - // (ConcurrentParallelGatewaySenderQueue)((ParallelGatewaySenderImpl)sender) - // .getQueues().toArray(new RegionQueue[1])[0]; - // parallelQueue.addShadowPartitionedRegionForUserRR(this); - // } } } } @@ -2592,8 +2406,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** * Free resources held by this region. This method is invoked after isDestroyed has been set to * true. - * - * @see LocalRegion#postDestroyRegion(boolean, RegionEventImpl) */ @Override protected void postDestroyRegion(boolean destroyDiskRegion, RegionEventImpl event) { @@ -2605,7 +2417,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // I don't think this should ever happens: bulletproofing for bug 39454 logger.warn("postDestroyRegion: encountered cancellation", e); } - } @Override @@ -2616,9 +2427,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA generateEventID()); distributeDestroyRegion(ev, true); distributedRegionCleanup(null); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // someone else must have concurrently destroyed the region (maybe a distributed destroy) - } catch (CancelException e) { + } catch (CancelException ignore) { // cache or DS is closed, ignore } catch (VirtualMachineError e) { SystemFailure.initiateFailure(e); @@ -2629,13 +2440,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } - /** - * @see LocalRegion#handleCacheClose(Operation) - */ @Override - void handleCacheClose(Operation op) { + void handleCacheClose(Operation operation) { try { - super.handleCacheClose(op); + super.handleCacheClose(operation); } finally { distributedRegionCleanup(null); } @@ -2643,8 +2451,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA /** * invoke a cache writer before a put is performed elsewhere - * - * @see LocalRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean, Object) */ @Override protected void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients, @@ -2700,28 +2506,33 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } + @Override public void addGatewaySenderId(String gatewaySenderId) { super.addGatewaySenderId(gatewaySenderId); new UpdateAttributesProcessor(this).distribute(); } + @Override public void removeGatewaySenderId(String gatewaySenderId) { super.removeGatewaySenderId(gatewaySenderId); new UpdateAttributesProcessor(this).distribute(); } + @Override public void addAsyncEventQueueId(String asyncEventQueueId) { super.addAsyncEventQueueId(asyncEventQueueId); new UpdateAttributesProcessor(this).distribute(); } + @Override public void removeAsyncEventQueueId(String asyncEventQueueId) { super.removeAsyncEventQueueId(asyncEventQueueId); new UpdateAttributesProcessor(this).distribute(); } + @Override public void checkSameSenderIdsAvailableOnAllNodes() { - List senderIds = + List<Set<String>> senderIds = this.getCacheDistributionAdvisor().adviseSameGatewaySenderIds(getGatewaySenderIds()); if (!senderIds.isEmpty()) { throw new GatewaySenderConfigurationException( @@ -2730,7 +2541,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA new Object[] {this.getName(), senderIds.get(0), senderIds.get(1)})); } - List asycnQueueIds = this.getCacheDistributionAdvisor() + List<Set<String>> asycnQueueIds = this.getCacheDistributionAdvisor() .adviseSameAsyncEventQueueIds(getVisibleAsyncEventQueueIds()); if (!asycnQueueIds.isEmpty()) { throw new GatewaySenderConfigurationException( @@ -2778,8 +2589,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (!dlock.tryLock(timeLeft, TimeUnit.SECONDS)) { msg = LocalizedStrings.DistributedRegion_ATTEMPT_TO_ACQUIRE_DISTRIBUTED_LOCK_FOR_0_FAILED_AFTER_WAITING_1_SECONDS; - msgArgs = - new Object[] {key, Long.valueOf((System.currentTimeMillis() - start) / 1000L)}; + msgArgs = new Object[] {key, (System.currentTimeMillis() - start) / 1000L}; break; } @@ -2787,9 +2597,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } catch (InterruptedException ex) { interrupted = true; this.cache.getCancelCriterion().checkCancelInProgress(ex); - // FIXME Why is it OK to keep going? + // TODO: Why is it OK to keep going? if (lockTimeout > -1) { - timeLeft = getCache().getLockTimeout() - ((System.currentTimeMillis() - start) / 1000L); + timeLeft = getCache().getLockTimeout() - (System.currentTimeMillis() - start) / 1000L; } } finally { if (interrupted) { @@ -2800,7 +2610,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (msg == null) { msg = LocalizedStrings.DistributedRegion_TIMED_OUT_AFTER_WAITING_0_SECONDS_FOR_THE_DISTRIBUTED_LOCK_FOR_1; - msgArgs = new Object[] {Integer.valueOf(getCache().getLockTimeout()), key}; + msgArgs = new Object[] {getCache().getLockTimeout(), key}; } throw new TimeoutException(msg.toLocalizedString(msgArgs)); } else { @@ -2812,10 +2622,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * Checks if the entry is a valid entry * * @return true if entry not null or entry is not removed - * */ protected boolean checkEntryNotValid(RegionEntry mapEntry) { - return (mapEntry == null || (mapEntry.isRemoved() && !mapEntry.isTombstone())); + return mapEntry == null || mapEntry.isRemoved() && !mapEntry.isTombstone(); } /** @@ -2823,6 +2632,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * an iterator that uses hash ordering from the entry map, or, in the case of an overflow region, * an iterator that iterates over the entries in disk order. */ + @Override public Iterator<RegionEntry> getBestIterator(boolean includeValues) { DiskRegion dr = this.getDiskRegion(); @@ -2830,20 +2640,12 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // Wait for the disk region to recover values first. dr.waitForAsyncRecovery(); if (dr.getNumOverflowOnDisk() > 0) { - return new DiskSavyIterator(); + return new DiskSavvyIterator(); } } return this.entries.regionEntries().iterator(); } - // /** - // * The maximum number of entries that can be put into the diskMap before - // * some of them are read from disk and returned by this iterator. - // * The larger this number the more memory this iterator is allowed to consume - // * and the better it will do in optimally reading the pending entries. - // */ - // static final long MAX_PENDING_ENTRIES = Long.getLong("gemfire.MAX_PENDING_ENTRIES", - // 1000000).longValue(); /** * Should only be used if this region has entries on disk that are not in memory. This currently * happens for overflow and for recovery when values are not recovered. The first iteration does a @@ -2851,26 +2653,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * it saves it in a list sorted by the location on disk. Once the regionEntries iterator has * nothing more to iterate it starts iterating over, in disk order, the entries on disk. */ - private class DiskSavyIterator implements Iterator<RegionEntry> { + private class DiskSavvyIterator implements Iterator<RegionEntry> { private boolean usingIt = true; + private Iterator<?> it = entries.regionEntries().iterator(); + // iterator for nested ArrayLists private Iterator<RegionEntry> subIt = null; - // private final ArrayList<DiskPosition> diskList = new ArrayList<DiskPosition>(/*@todo presize - // based on number of entries only on disk*/); - // value will be either RegionEntry or an ArrayList<RegionEntry> - // private long pendingCount = 0; - private final java.util.TreeMap<DiskPage, Object> diskMap = - new java.util.TreeMap<DiskPage, Object>(); - // /** - // * used to iterate over the fullest pages at the time we have - // * added MAX_PENDING_ENTRIES to diskMap; - // */ - // private Iterator<Map.Entry<DiskPage, Object>> sortedDiskIt; + private final TreeMap<DiskPage, Object> diskMap = new TreeMap<>(); - public DiskSavyIterator() {} + DiskSavvyIterator() {} + @Override public boolean hasNext() { boolean result; if (this.subIt != null) { @@ -2881,128 +2676,68 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return result; } } - // if (this.sortedDiskIt != null) { - // result = this.sortedDiskIt.hasNext(); - // if (!result) { - // this.sortedDiskIt = null; - // } else { - // return result; - // } - // } + result = this.it.hasNext(); if (this.usingIt && !result) { this.usingIt = false; - // long start = System.currentTimeMillis(); - // Collections.sort(this.diskList); - // long end = System.currentTimeMillis(); this.it = this.diskMap.values().iterator(); result = this.it.hasNext(); } return result; } + @Override public RegionEntry next() { for (;;) { if (this.subIt != null) { return this.subIt.next(); - // } else if (this.sortedDiskIt != null) { - // Map.Entry<DiskPage, Object> me = this.sortedDiskIt.next(); - // // remove the page from the diskMap. - // this.diskMap.remove(me.getKey()); - // Object v = me.getValue(); - // int size = 1; - // if (v instanceof ArrayList) { - // ArrayList al = (ArrayList)v; - // size = al.size(); - // // set up the iterator to start returning the entries on that page - // this.subIt = al.iterator(); - // v = this.subIt.next(); - // } - - // // decrement pendingCount by the number of entries on the page - // this.pendingCount -= size; - // // return the first region entry on this page - // return v; } if (this.usingIt) { - RegionEntry re = (RegionEntry) this.it.next(); - DiskPosition dp = new DiskPosition(); - if (re.isOverflowedToDisk(DistributedRegion.this, dp)) { - // add dp to sorted list - DiskPage dPage = new DiskPage(dp); - Object v = this.diskMap.get(dPage); - if (v == null) { - this.diskMap.put(dPage, re); - } else if (v instanceof ArrayList) { - ArrayList al = (ArrayList) v; - al.add(re); + RegionEntry regionEntry = (RegionEntry) this.it.next(); + DiskPosition diskPosition = new DiskPosition(); + if (regionEntry.isOverflowedToDisk(DistributedRegion.this, diskPosition)) { + // add diskPosition to sorted list + DiskPage dPage = new DiskPage(diskPosition); + Object value = this.diskMap.get(dPage); + if (value == null) { + this.diskMap.put(dPage, regionEntry); + } else if (value instanceof ArrayList) { + List list = (List) value; + list.add(regionEntry); } else { - ArrayList al = new ArrayList(); - al.add(v); - al.add(re); - this.diskMap.put(dPage, al); + List list = new ArrayList(); + list.add(value); + list.add(regionEntry); + this.diskMap.put(dPage, list); } if (!hasNext()) { assert false; // must be true } - // this.pendingCount++; - // if (this.usingIt && this.pendingCount >= MAX_PENDING_ENTRIES) { - // // find the pages that have the most entries - // int largestPage = 1; - // ArrayList<Map.Entry<DiskPage, Object>> largestPages - // = new ArrayList<Map.Entry<DiskPage, Object>>(); - // for (Map.Entry<DiskPage, Object> me: this.diskMap.entrySet()) { - // int meSize = 1; - // if (me.getValue() instanceof ArrayList) { - // meSize = ((ArrayList)me.getValue()).size(); - // } - // if (meSize > largestPage) { - // largestPage = meSize; - // largestPages.clear(); // throw away smaller pages - // largestPages.add(me); - // } else if (meSize == largestPage) { - // largestPages.add(me); - // } else { - // // ignore this page - // } - // } - // Collections.sort(largestPages, new Comparator - // <Map.Entry<DiskPage, Object>>() { - // /** - // * Note: this comparator imposes orderings that are inconsistent - // * with equals. - // */ - // public int compare(Map.Entry<DiskPage, Object> o1, Map.Entry<DiskPage, Object> o2) { - // return o1.getKey().compareTo(o2.getKey()); - // } - // }); - // this.sortedDiskIt = largestPages.iterator(); - // // loop around and fetch first value from sortedDiskIt - // } } else { - return re; + return regionEntry; } } else { - Object v = this.it.next(); - if (v instanceof ArrayList) { - ArrayList al = (ArrayList) v; - this.subIt = al.iterator(); + Object value = this.it.next(); + if (value instanceof ArrayList) { + List list = (List) value; + this.subIt = list.iterator(); return this.subIt.next(); } else { - return (RegionEntry) v; + return (RegionEntry) value; } } } } + @Override public void remove() { throw new UnsupportedOperationException(); } } public static class DiskPosition implements Comparable<DiskPosition> { - private long oplogId; - private long offset; + long oplogId; // package-private to avoid synthetic accessor + long offset; // package-private to avoid synthetic accessor DiskPosition() {} @@ -3013,19 +2748,21 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA @Override public int hashCode() { + // TODO: Object instantiation inside 'hashCode()' is bad return Long.valueOf(this.oplogId ^ this.offset).hashCode(); } @Override - public boolean equals(Object o) { - if (o instanceof DiskPosition) { - DiskPosition other = (DiskPosition) o; + public boolean equals(Object obj) { + if (obj instanceof DiskPosition) { + DiskPosition other = (DiskPosition) obj; return this.oplogId == other.oplogId && this.offset == other.offset; } else { return false; } } + @Override public int compareTo(DiskPosition o) { int result = Long.signum(this.oplogId - o.oplogId); if (result == 0) { @@ -3036,18 +2773,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA @Override public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("<").append(this.oplogId).append(":").append(this.offset).append(">"); + StringBuilder sb = new StringBuilder(); + sb.append('<').append(this.oplogId).append(':').append(this.offset).append('>'); return sb.toString(); } } + static class DiskPage extends DiskPosition { static final long DISK_PAGE_SIZE = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "DISK_PAGE_SIZE", 8 * 1024L).longValue(); + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "DISK_PAGE_SIZE", 8 << 10); - DiskPage(DiskPosition dp) { - this.setPosition(dp.oplogId, dp.offset / DISK_PAGE_SIZE); + DiskPage(DiskPosition diskPosition) { + this.setPosition(diskPosition.oplogId, diskPosition.offset / DISK_PAGE_SIZE); } } @@ -3055,7 +2793,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * Returns the lock lease value to use for DistributedLock and RegionDistributedLock. -1 is * supported as non-expiring lock. */ - protected long getLockLeaseForLock() { + long getLockLeaseForLock() { // package-private to avoid synthetic accessor if (getCache().getLockLease() == -1) { return -1; } @@ -3066,24 +2804,22 @@ public class DistributedReg
<TRUNCATED>