http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java index daa31fd..1bc31b3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java @@ -12,13 +12,23 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; +import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.*; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.InvalidDeltaException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.TimeoutException; import org.apache.geode.cache.query.IndexMaintenanceException; import org.apache.geode.cache.query.QueryException; import org.apache.geode.cache.query.internal.index.IndexManager; @@ -28,18 +38,32 @@ import org.apache.geode.cache.util.GatewayConflictResolver; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.*; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.ByteArrayDataInput; +import org.apache.geode.internal.HeapDataOutputStream; +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.InternalStatisticsDisabledException; +import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.lru.LRUClockNode; import org.apache.geode.internal.cache.lru.NewLRUClockHand; import org.apache.geode.internal.cache.persistence.DiskStoreID; -import org.apache.geode.internal.cache.versions.*; +import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.internal.cache.versions.VersionSource; +import org.apache.geode.internal.cache.versions.VersionStamp; +import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.lang.StringUtils; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.geode.internal.offheap.*; +import org.apache.geode.internal.offheap.MemoryAllocator; +import org.apache.geode.internal.offheap.MemoryAllocatorImpl; +import org.apache.geode.internal.offheap.OffHeapHelper; +import org.apache.geode.internal.offheap.ReferenceCountHelper; +import org.apache.geode.internal.offheap.Releasable; +import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.offheap.annotations.Unretained; @@ -53,24 +77,14 @@ import org.apache.geode.pdx.PdxSerializationException; import org.apache.geode.pdx.PdxSerializer; import org.apache.geode.pdx.internal.ConvertableToBytes; import org.apache.geode.pdx.internal.PdxInstanceImpl; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.Arrays; - -import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE; -import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE; /** * Abstract implementation class of RegionEntry interface. This is the topmost implementation class * so common behavior lives here. * * @since GemFire 3.5.1 - * - * */ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Object, Object> { - private static final Logger logger = LogService.getLogger(); /** @@ -83,39 +97,37 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje /* * Flags for a Region Entry. These flags are stored in the msb of the long used to also store the - * lastModicationTime. + * lastModificationTime. */ private static final long VALUE_RESULT_OF_SEARCH = 0x01L << 56; + private static final long UPDATE_IN_PROGRESS = 0x02L << 56; - private static final long TOMBSTONE_SCHEDULED = 0x04L << 56; + private static final long LISTENER_INVOCATION_IN_PROGRESS = 0x08L << 56; + /** used for LRUEntry instances. */ protected static final long RECENTLY_USED = 0x10L << 56; + /** used for LRUEntry instances. */ protected static final long EVICTED = 0x20L << 56; + /** * Set if the entry is being used by a transactions. Some features (eviction and expiration) will * not modify an entry when a tx is using it to prevent the tx to fail do to conflict. */ - protected static final long IN_USE_BY_TX = 0x40L << 56; - - - protected static final long MARKED_FOR_EVICTION = 0x80L << 56; - // public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut() + private static final long IN_USE_BY_TX = 0x40L << 56; protected AbstractRegionEntry(RegionEntryContext context, @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) { - setValue(context, this.prepareValueForCache(context, value, false), false); - // setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 + setValue(context, prepareValueForCache(context, value, false), false); + + // setLastModified(System.currentTimeMillis()); this must be set later so we can use ==0 // to know this is a new entry in checkForConflicts } - ///////////////////////////////////////////////////////////////////// - ////////////////////////// instance methods ///////////////////////// - ///////////////////////////////////////////////////////////////////// - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IMSE_DONT_CATCH_IMSE") + @Override + @SuppressWarnings("IMSE_DONT_CATCH_IMSE") public boolean dispatchListenerEvents(final EntryEventImpl event) throws InterruptedException { final LocalRegion rgn = event.getRegion(); @@ -151,14 +163,17 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } + @Override public long getLastAccessed() throws InternalStatisticsDisabledException { throw new InternalStatisticsDisabledException(); } + @Override public long getHitCount() throws InternalStatisticsDisabledException { throw new InternalStatisticsDisabledException(); } + @Override public long getMissCount() throws InternalStatisticsDisabledException { throw new InternalStatisticsDisabledException(); } @@ -185,41 +200,46 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje _setLastModified(lastModified); } + @Override public void txDidDestroy(long currTime) { setLastModifiedAndAccessedTimes(currTime, currTime); } - public final void updateStatsForPut(long lastModifiedTime, long lastAccessedTime) { + @Override + public void updateStatsForPut(long lastModifiedTime, long lastAccessedTime) { setLastModifiedAndAccessedTimes(lastModifiedTime, lastAccessedTime); } + @Override public void setRecentlyUsed() { // do nothing by default; only needed for LRU } + @Override public void updateStatsForGet(boolean hit, long time) { // nothing needed } + @Override public void resetCounts() throws InternalStatisticsDisabledException { throw new InternalStatisticsDisabledException(); } - public void _removePhase1() { + void _removePhase1() { _setValue(Token.REMOVED_PHASE1); - // debugging for 38467 (hot thread in ARM.basicUpdate) - // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread()); } - public void removePhase1(LocalRegion r, boolean isClear) throws RegionClearedException { + @Override + public void removePhase1(LocalRegion r, boolean clear) throws RegionClearedException { _removePhase1(); } + @Override public void removePhase2() { _setValue(Token.REMOVED_PHASE2); - // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread()); } + @Override public void makeTombstone(LocalRegion r, VersionTag version) throws RegionClearedException { assert r.getVersionVector() != null; assert version != null; @@ -237,7 +257,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje r.unscheduleTombstone(this); } setRecentlyUsed(); - boolean newEntry = (getValueAsToken() == Token.REMOVED_PHASE1); + boolean newEntry = getValueAsToken() == Token.REMOVED_PHASE1; basicMakeTombstone(r); r.scheduleTombstone(this, version); if (newEntry) { @@ -259,14 +279,13 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - @Override public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException { if (v == Token.TOMBSTONE) { - makeTombstone((LocalRegion) e.getRegion(), ((EntryEventImpl) e).getVersionTag()); + makeTombstone((LocalRegion) e.getRegion(), ((InternalCacheEvent) e).getVersionTag()); } else { - setValue((LocalRegion) e.getRegion(), v, (EntryEventImpl) e); + setValue((RegionEntryContext) e.getRegion(), v, (EntryEventImpl) e); } } @@ -278,81 +297,86 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * this method calls Token.isRemoved, and places that don't want a destroyed Token can explicitly * check for a DESTROY token. */ - public final boolean isRemoved() { + @Override + public boolean isRemoved() { Token o = getValueAsToken(); - return (o == Token.REMOVED_PHASE1) || (o == Token.REMOVED_PHASE2) || (o == Token.TOMBSTONE); + return o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2 || o == Token.TOMBSTONE; } - public final boolean isDestroyedOrRemoved() { + @Override + public boolean isDestroyedOrRemoved() { return Token.isRemoved(getValueAsToken()); } - public final boolean isDestroyedOrRemovedButNotTombstone() { + @Override + public boolean isDestroyedOrRemovedButNotTombstone() { Token o = getValueAsToken(); return o == Token.DESTROYED || o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2; } - public final boolean isTombstone() { + @Override + public boolean isTombstone() { return getValueAsToken() == Token.TOMBSTONE; } - public final boolean isRemovedPhase2() { + @Override + public boolean isRemovedPhase2() { return getValueAsToken() == Token.REMOVED_PHASE2; } + @Override public boolean fillInValue(LocalRegion region, - @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst, + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry entry, ByteArrayDataInput in, DM mgr) { - dst.setSerialized(false); // starting default value + + // starting default value + entry.setSerialized(false); @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v; if (isTombstone()) { v = Token.TOMBSTONE; } else { - v = getValue(region); // OFFHEAP: need to incrc, copy bytes, decrc + // OFFHEAP: need to incrc, copy bytes, decrc + v = getValue(region); if (v == null) { return false; } } - dst.setLastModified(mgr, getLastModified()); // fix for bug 31059 + entry.setLastModified(mgr, getLastModified()); // fix for bug 31059 if (v == Token.INVALID) { - dst.setInvalid(); + entry.setInvalid(); } else if (v == Token.LOCAL_INVALID) { - dst.setLocalInvalid(); + entry.setLocalInvalid(); } else if (v == Token.TOMBSTONE) { - dst.setTombstone(); + entry.setTombstone(); } else if (v instanceof CachedDeserializable) { // don't serialize here if it is not already serialized CachedDeserializable cd = (CachedDeserializable) v; if (!cd.isSerialized()) { - dst.value = cd.getDeserializedForReading(); + entry.value = cd.getDeserializedForReading(); } else { - { - Object tmp = cd.getValue(); - if (tmp instanceof byte[]) { - byte[] bb = (byte[]) tmp; - dst.value = bb; - } else { - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); - BlobHelper.serializeTo(tmp, hdos); - hdos.trim(); - dst.value = hdos; - } catch (IOException e) { - RuntimeException e2 = new IllegalArgumentException( - LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING - .toLocalizedString()); - e2.initCause(e); - throw e2; - } + Object tmp = cd.getValue(); + if (tmp instanceof byte[]) { + entry.value = tmp; + } else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + BlobHelper.serializeTo(tmp, hdos); + hdos.trim(); + entry.value = hdos; + } catch (IOException e) { + throw new IllegalArgumentException( + LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING + .toLocalizedString(), + e); } - dst.setSerialized(true); } + entry.setSerialized(true); } } else if (v instanceof byte[]) { - dst.value = v; + entry.value = v; } else { Object preparedValue = v; if (preparedValue != null) { @@ -361,20 +385,17 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return false; } } - { - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); - BlobHelper.serializeTo(preparedValue, hdos); - hdos.trim(); - dst.value = hdos; - dst.setSerialized(true); - } catch (IOException e) { - RuntimeException e2 = new IllegalArgumentException( - LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING - .toLocalizedString()); - e2.initCause(e); - throw e2; - } + try { + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + BlobHelper.serializeTo(preparedValue, hdos); + hdos.trim(); + entry.value = hdos; + entry.setSerialized(true); + } catch (IOException e) { + throw new IllegalArgumentException( + LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING + .toLocalizedString(), + e); } } return true; @@ -385,7 +406,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * * @return the value to provide to the gii request; null if no value should be provided. */ - public static Object prepareValueForGII(Object v) { + static Object prepareValueForGII(Object v) { assert v != null; if (v instanceof GatewaySenderEventImpl) { return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap(); @@ -394,6 +415,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } + @Override public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) { return false; } @@ -403,19 +425,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje ReferenceCountHelper.createReferenceCountOwner(); @Retained Object result = _getValueRetain(context, true); - // Asif: If the thread is an Index Creation Thread & the value obtained is + + // If the thread is an Index Creation Thread & the value obtained is // Token.REMOVED , we can skip synchronization block. This is required to prevent // the dead lock caused if an Index Update Thread has gone into a wait holding the // lock of the Entry object. There should not be an issue if the Index creation thread // gets the temporary value of token.REMOVED as the correct value will get indexed // by the Index Update Thread , once the index creation thread has exited. // Part of Bugfix # 33336 - // if ((result == Token.REMOVED_PHASE1 || result == Token.REMOVED_PHASE2) && - // !r.isIndexCreationThread()) { - // synchronized (this) { - // result = _getValue(); - // } - // } if (Token.isRemoved(result)) { ReferenceCountHelper.setReferenceCountOwner(null); @@ -445,7 +462,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje @Released public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException { - // @todo darrel: This will mark new entries as being recently used + // TODO: This will mark new entries as being recently used // It might be better to only mark them when they are modified. // Or should we only mark them on reads? setValue(context, value, true); @@ -467,10 +484,9 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - public void releaseOffHeapRefIfRegionBeingClosedOrDestroyed(RegionEntryContext context, - Object ref) { + void releaseOffHeapRefIfRegionBeingClosedOrDestroyed(RegionEntryContext context, Object ref) { if (isOffHeapReference(ref) && isThisRegionBeingClosedOrDestroyed(context)) { - ((OffHeapRegionEntry) this).release(); + ((Releasable) this).release(); } } @@ -503,7 +519,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return value; } - static protected Object compress(RegionEntryContext context, Object value) { + protected static Object compress(RegionEntryContext context, Object value) { return compress(context, value, null); } @@ -515,7 +531,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * * @return the compressed form of the value parameter. */ - static protected Object compress(RegionEntryContext context, Object value, EntryEventImpl event) { + protected static Object compress(RegionEntryContext context, Object value, EntryEventImpl event) { if (isCompressible(context, value)) { long time = context.getCachePerfStats().startCompression(); byte[] serializedValue; @@ -569,12 +585,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } - public final Object getValueInVM(RegionEntryContext context) { + @Override + public Object getValueInVM(RegionEntryContext context) { ReferenceCountHelper.createReferenceCountOwner(); @Released Object v = _getValueRetain(context, true); - if (v == null) { // should only be possible if disk entry + if (v == null) { + // should only be possible if disk entry v = Token.NOT_AVAILABLE; } Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); @@ -582,6 +600,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return result; } + @Override public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) { return getValueInVM(owner); } @@ -591,56 +610,56 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) { @Retained Object result = _getValueRetain(owner, true); - // if (result instanceof ByteSource) { - // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it - // Object deserVal = ((CachedDeserializable)result).getDeserializedForReading(); - // if (deserVal != result) { - // OffHeapHelper.release(result); - // result = deserVal; - // } - // } return result; } + @Override public Object getValueOnDisk(LocalRegion r) throws EntryNotFoundException { throw new IllegalStateException( LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK .toLocalizedString()); } - public Object getSerializedValueOnDisk(final LocalRegion r) throws EntryNotFoundException { + @Override + public Object getSerializedValueOnDisk(final LocalRegion localRegion) + throws EntryNotFoundException { throw new IllegalStateException( LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK .toLocalizedString()); } + @Override public Object getValueOnDiskOrBuffer(LocalRegion r) throws EntryNotFoundException { throw new IllegalStateException( LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK .toLocalizedString()); - // @todo darrel if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException + // TODO: if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException } - public final boolean initialImagePut(final LocalRegion region, final long lastModifiedTime, - Object newValue, boolean wasRecovered, boolean versionTagAccepted) - throws RegionClearedException { + @Override + public boolean initialImagePut(final LocalRegion region, final long lastModified, Object newValue, + boolean wasRecovered, boolean acceptedVersionTag) throws RegionClearedException { // note that the caller has already write synced this RegionEntry - return initialImageInit(region, lastModifiedTime, newValue, this.isTombstone(), wasRecovered, - versionTagAccepted); + return initialImageInit(region, lastModified, newValue, this.isTombstone(), wasRecovered, + acceptedVersionTag); } - public boolean initialImageInit(final LocalRegion region, final long lastModifiedTime, + @Override + public boolean initialImageInit(final LocalRegion region, final long lastModified, final Object newValue, final boolean create, final boolean wasRecovered, - final boolean versionTagAccepted) throws RegionClearedException { + final boolean acceptedVersionTag) throws RegionClearedException { + // note that the caller has already write synced this RegionEntry boolean result = false; + // if it has been destroyed then don't do anything Token vTok = getValueAsToken(); - if (versionTagAccepted || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { // OFFHEAP - // noop + if (acceptedVersionTag || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { + // OFFHEAP noop Object newValueToWrite = newValue; - boolean putValue = versionTagAccepted || create || (newValueToWrite != Token.LOCAL_INVALID - && (wasRecovered || (vTok == Token.LOCAL_INVALID))); // OFFHEAP noop + // OFFHEAP noop + boolean putValue = acceptedVersionTag || create || (newValueToWrite != Token.LOCAL_INVALID + && (wasRecovered || (vTok == Token.LOCAL_INVALID))); if (region.isUsedForPartitionedRegionAdmin() && newValueToWrite instanceof CachedDeserializable) { @@ -650,9 +669,9 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null); if (!create && newValueToWrite instanceof Versionable) { - final Object oldValue = getValueInVM(region); // Heap value should always be deserialized - // at this point // OFFHEAP will not be - // deserialized + // Heap value should always be deserialized at this point // OFFHEAP will not be + // deserialized + final Object oldValue = getValueInVM(region); // BUGFIX for 35029. If oldValue is null the newValue should be put. if (oldValue == null) { putValue = true; @@ -667,7 +686,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje if (putValue) { // change to INVALID if region itself has been invalidated, // and current value is recovered - if (create || versionTagAccepted) { + if (create || acceptedVersionTag) { // At this point, since we now always recover from disk first, // we only care about "isCreate" since "isRecovered" is impossible // if we had a regionInvalidate or regionClear @@ -690,12 +709,12 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } if (!entryOK) { - // Asif: If the region has been issued cleared during + // If the region has been issued cleared during // the GII , then those entries loaded before this one would have // been cleared from the Map due to clear operation & for the // currententry whose key may have escaped the clearance , will be // cleansed by the destroy token. - newValueToWrite = Token.DESTROYED; + newValueToWrite = Token.DESTROYED; // TODO: never used imageState.addDestroyedEntry(this.getKey()); throw new RegionClearedException( LocalizedStrings.AbstractRegionEntry_DURING_THE_GII_PUT_OF_ENTRY_THE_REGION_GOT_CLEARED_SO_ABORTING_THE_OPERATION @@ -710,7 +729,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje if (create) { region.getCachePerfStats().incCreates(); } - region.updateStatsForPut(this, lastModifiedTime, false); + region.updateStatsForPut(this, lastModified, false); } if (logger.isTraceEnabled()) { @@ -732,84 +751,77 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * @throws EntryNotFoundException if expectedOldValue is not null and is not equal to current * value */ + @Override @Released - public final boolean destroy(LocalRegion region, EntryEventImpl event, boolean inTokenMode, + public boolean destroy(LocalRegion region, EntryEventImpl event, boolean inTokenMode, boolean cacheWrite, @Unretained Object expectedOldValue, boolean forceDestroy, boolean removeRecoveredEntry) throws CacheWriterException, EntryNotFoundException, TimeoutException, RegionClearedException { - boolean proceed = false; - { - // A design decision was made to not retrieve the old value from the disk - // if the entry has been evicted to only have the CacheListener afterDestroy - // method ignore it. We don't want to pay the performance penalty. The - // getValueInVM method does not retrieve the value from disk if it has been - // evicted. Instead, it uses the NotAvailable token. - // - // If the region is a WAN queue region, the old value is actually used by the - // afterDestroy callback on a secondary. It is not needed on a primary. - // Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary - // we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote. - // - // :ezoerner:20080814 We also read old value from disk or buffer - // in the case where there is a non-null expectedOldValue - // see PartitionedRegion#remove(Object key, Object value) - ReferenceCountHelper.skipRefCountTracking(); - @Retained - @Released - Object curValue = _getValueRetain(region, true); - ReferenceCountHelper.unskipRefCountTracking(); - try { - if (curValue == null) - curValue = Token.NOT_AVAILABLE; - - if (curValue == Token.NOT_AVAILABLE) { - // In some cases we need to get the current value off of disk. - - // if the event is transmitted during GII and has an old value, it was - // the state of the transmitting cache's entry & should be used here - if (event.getCallbackArgument() != null - && event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN) - && event.isOriginRemote()) { // check originRemote for bug 40508 - // curValue = getValue(region); can cause deadlock if GII is occurring + + // A design decision was made to not retrieve the old value from the disk + // if the entry has been evicted to only have the CacheListener afterDestroy + // method ignore it. We don't want to pay the performance penalty. The + // getValueInVM method does not retrieve the value from disk if it has been + // evicted. Instead, it uses the NotAvailable token. + // + // If the region is a WAN queue region, the old value is actually used by the + // afterDestroy callback on a secondary. It is not needed on a primary. + // Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary + // we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote. + // + // We also read old value from disk or buffer + // in the case where there is a non-null expectedOldValue + // see PartitionedRegion#remove(Object key, Object value) + ReferenceCountHelper.skipRefCountTracking(); + @Retained + @Released + Object curValue = _getValueRetain(region, true); + ReferenceCountHelper.unskipRefCountTracking(); + boolean proceed; + try { + if (curValue == null) { + curValue = Token.NOT_AVAILABLE; + } + + if (curValue == Token.NOT_AVAILABLE) { + // In some cases we need to get the current value off of disk. + + // if the event is transmitted during GII and has an old value, it was + // the state of the transmitting cache's entry & should be used here + if (event.getCallbackArgument() != null + && event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN) + && event.isOriginRemote()) { // check originRemote for bug 40508 + // curValue = getValue(region); can cause deadlock if GII is occurring + curValue = getValueOnDiskOrBuffer(region); + } else { + FilterProfile fp = region.getFilterProfile(); + if (fp != null && (fp.getCqCount() > 0 || expectedOldValue != null)) { + // curValue = getValue(region); can cause deadlock will fault in the value + // and will confuse LRU. curValue = getValueOnDiskOrBuffer(region); - } else { - FilterProfile fp = region.getFilterProfile(); - if (fp != null && ((fp.getCqCount() > 0) || expectedOldValue != null)) { - // curValue = getValue(region); can cause deadlock will fault in the value - // and will confuse LRU. rdubey. - curValue = getValueOnDiskOrBuffer(region); - } } } + } - if (expectedOldValue != null) { - if (!checkExpectedOldValue(expectedOldValue, curValue, region)) { - throw new EntryNotFoundException( - LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE - .toLocalizedString()); - } + if (expectedOldValue != null) { + if (!checkExpectedOldValue(expectedOldValue, curValue, region)) { + throw new EntryNotFoundException( + LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE + .toLocalizedString()); } + } - if (inTokenMode && event.hasOldValue()) { - proceed = true; - } else { - proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) - || removeRecoveredEntry || forceDestroy || region.getConcurrencyChecksEnabled() // fix - // for - // bug - // #47868 - // - - // create - // a - // tombstone - || (event.getOperation() == Operation.REMOVE // fix for bug #42242 - && (curValue == null || curValue == Token.LOCAL_INVALID - || curValue == Token.INVALID)); - } - } finally { - OffHeapHelper.releaseWithNoTracking(curValue); + if (inTokenMode && event.hasOldValue()) { + proceed = true; + } else { + proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) + || removeRecoveredEntry || forceDestroy || region.getConcurrencyChecksEnabled() + || (event.getOperation() == Operation.REMOVE && (curValue == null + || curValue == Token.LOCAL_INVALID || curValue == Token.INVALID)); } - } // end curValue block + } finally { + OffHeapHelper.releaseWithNoTracking(curValue); + } if (proceed) { // Generate the version tag if needed. This method should only be @@ -820,7 +832,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } if (cacheWrite) { region.cacheWriteBeforeDestroy(event, expectedOldValue); - if (event.getRegion().getServerProxy() != null) { // server will return a version tag + if (event.getRegion().getServerProxy() != null) { + // server will return a version tag // update version information (may throw ConcurrentCacheModificationException) VersionStamp stamp = getVersionStamp(); if (stamp != null) { @@ -856,8 +869,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje boolean removeEntry = false; VersionTag v = event.getVersionTag(); if (region.concurrencyChecksEnabled && !removeRecoveredEntry - && !event.isFromRILocalDestroy()) { // bug #46780, don't retain tombstones for entries - // destroyed for register-interest + && !event.isFromRILocalDestroy()) { + // bug #46780, don't retain tombstones for entries destroyed for register-interest // Destroy will write a tombstone instead if (v == null || !v.hasValidVersion()) { // localDestroy and eviction and ops received with no version tag @@ -869,7 +882,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje event.setVersionTag(v); } } - removeEntry = (v == null) || !v.hasValidVersion(); + removeEntry = v == null || !v.hasValidVersion(); } else { removeEntry = true; } @@ -894,12 +907,11 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - - static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) { + if (Token.isInvalid(expectedOldValue)) { - return (actualValue == null) || Token.isInvalid(actualValue); + return actualValue == null || Token.isInvalid(actualValue); } else { boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null; @@ -974,7 +986,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, + private static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) { // need to give PdxInstance#equals priority if (v1 instanceof PdxInstance) { @@ -1057,7 +1069,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje if (pi != null) { return checkPdxEquals(pi, obj); } - if (isCompressedOffHeap) { // fix for bug 52248 + if (isCompressedOffHeap) { + // fix for bug 52248 byte[] serializedObj; if (obj instanceof CachedDeserializable) { serializedObj = ((CachedDeserializable) obj).getSerializedValue(); @@ -1066,7 +1079,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } return Arrays.equals(cdValBytes, serializedObj); } else { - /** + /* * To be more compatible with previous releases do not compare the serialized forms here. * Instead deserialize and call the equals method. */ @@ -1124,28 +1137,27 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } if (obj != null && obj.getClass().getName().equals(pdx.getClassName())) { - GemFireCacheImpl gfc = GemFireCacheImpl.getForPdx("Could not access Pdx registry"); - if (gfc != null) { + InternalCache internalCache = GemFireCacheImpl.getForPdx("Could not access Pdx registry"); + if (internalCache != null) { PdxSerializer pdxSerializer; if (obj instanceof PdxSerializable) { pdxSerializer = null; } else { - pdxSerializer = gfc.getPdxSerializer(); + pdxSerializer = internalCache.getPdxSerializer(); } if (pdxSerializer != null || obj instanceof PdxSerializable) { // try to convert obj to a PdxInstance HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); try { if (InternalDataSerializer.autoSerialized(obj, hdos) - || InternalDataSerializer.writePdx(hdos, gfc, obj, pdxSerializer)) { - PdxInstance pi = InternalDataSerializer.readPdxInstance(hdos.toByteArray(), gfc); + || InternalDataSerializer.writePdx(hdos, internalCache, obj, pdxSerializer)) { + PdxInstance pi = + InternalDataSerializer.readPdxInstance(hdos.toByteArray(), internalCache); if (pi != null) { obj = pi; } } - } catch (IOException ignore) { - // we are not able to convert it so just fall through - } catch (PdxSerializationException ignore) { + } catch (IOException | PdxSerializationException ignore) { // we are not able to convert it so just fall through } } @@ -1155,16 +1167,13 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return basicEquals(obj, pdx); } - - ///////////////////////////////////////////////////////////// - /////////////////////////// fields ////////////////////////// - ///////////////////////////////////////////////////////////// // Do not add any instance fields to this class. // Instead add them to LeafRegionEntry.cpp public static class HashRegionEntryCreator implements CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> { + @Override public HashEntry<Object, Object> newEntry(final Object key, final int hash, final HashEntry<Object, Object> next, final Object value) { final AbstractRegionEntry entry = (AbstractRegionEntry) value; @@ -1181,14 +1190,16 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return entry; } + @Override public int keyHashCode(final Object key, final boolean compareValues) { return CustomEntryConcurrentHashMap.keyHash(key, compareValues); } - }; + } + @Override public abstract Object getKey(); - protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) { + private static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) { if (v == null) return false; if (Token.isInvalidOrRemoved(v)) @@ -1215,7 +1226,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje private static final long LAST_MODIFIED_MASK = 0x00FFFFFFFFFFFFFFL; - protected final void _setLastModified(long lastModifiedTime) { + protected void _setLastModified(long lastModifiedTime) { if (lastModifiedTime < 0 || lastModifiedTime > LAST_MODIFIED_MASK) { throw new IllegalStateException("Expected lastModifiedTime " + lastModifiedTime + " to be >= 0 and <= " + LAST_MODIFIED_MASK); @@ -1223,31 +1234,32 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje long storedValue; long newValue; do { - storedValue = getlastModifiedField(); + storedValue = getLastModifiedField(); newValue = storedValue & ~LAST_MODIFIED_MASK; newValue |= lastModifiedTime; } while (!compareAndSetLastModifiedField(storedValue, newValue)); } - protected abstract long getlastModifiedField(); + protected abstract long getLastModifiedField(); protected abstract boolean compareAndSetLastModifiedField(long expectedValue, long newValue); - public final long getLastModified() { - return getlastModifiedField() & LAST_MODIFIED_MASK; + @Override + public long getLastModified() { + return getLastModifiedField() & LAST_MODIFIED_MASK; } - protected final boolean areAnyBitsSet(long bitMask) { - return (getlastModifiedField() & bitMask) != 0L; + protected boolean areAnyBitsSet(long bitMask) { + return (getLastModifiedField() & bitMask) != 0L; } /** * Any bits in "bitMask" that are 1 will be set. */ - protected final void setBits(long bitMask) { - boolean done = false; + protected void setBits(long bitMask) { + boolean done; do { - long bits = getlastModifiedField(); + long bits = getLastModifiedField(); long newBits = bits | bitMask; if (bits == newBits) return; @@ -1258,10 +1270,10 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje /** * Any bits in "bitMask" that are 0 will be cleared. */ - protected final void clearBits(long bitMask) { - boolean done = false; + protected void clearBits(long bitMask) { + boolean done; do { - long bits = getlastModifiedField(); + long bits = getLastModifiedField(); long newBits = bits & bitMask; if (bits == newBits) return; @@ -1329,6 +1341,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje data = (byte[]) val; } byte[] compressedData = compressBytes(r, data); + // TODO: array comparison is broken boolean isCompressed = compressedData != data; ReferenceCountHelper.setReferenceCountOwner(this); MemoryAllocator ma = MemoryAllocatorImpl.getAllocator(); // fix for bug 47875 @@ -1350,6 +1363,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje try { byte[] data = ((ConvertableToBytes) nv).toBytes(); byte[] compressedData = compressBytes(r, data); + // TODO: array comparison is broken if (data == compressedData) { nv = CachedDeserializableFactory.create(data); } else { @@ -1366,15 +1380,17 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje @Override @Unretained - public final Object _getValue() { + public Object _getValue() { return getValueField(); } - public final boolean isUpdateInProgress() { + @Override + public boolean isUpdateInProgress() { return areAnyBitsSet(UPDATE_IN_PROGRESS); } - public final void setUpdateInProgress(final boolean underUpdate) { + @Override + public void setUpdateInProgress(final boolean underUpdate) { if (underUpdate) { setBits(UPDATE_IN_PROGRESS); } else { @@ -1382,13 +1398,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - - public final boolean isCacheListenerInvocationInProgress() { + @Override + public boolean isCacheListenerInvocationInProgress() { return areAnyBitsSet(LISTENER_INVOCATION_IN_PROGRESS); } - public final void setCacheListenerInvocationInProgress(final boolean listenerInvoked) { - if (listenerInvoked) { + @Override + public void setCacheListenerInvocationInProgress(final boolean isListenerInvoked) { + if (isListenerInvoked) { setBits(LISTENER_INVOCATION_IN_PROGRESS); } else { clearBits(~LISTENER_INVOCATION_IN_PROGRESS); @@ -1396,12 +1413,12 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } @Override - public final boolean isInUseByTransaction() { + public boolean isInUseByTransaction() { return areAnyBitsSet(IN_USE_BY_TX); } @Override - public final void setInUseByTransaction(final boolean v) { + public void setInUseByTransaction(final boolean v) { if (v) { setBits(IN_USE_BY_TX); } else { @@ -1410,17 +1427,13 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } @Override - public final synchronized void incRefCount() { + public synchronized void incRefCount() { TXManagerImpl.incRefCount(this); setInUseByTransaction(true); } - /** - * {@inheritDoc} - */ - @Override - public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) { + public synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) { if (TXManagerImpl.decRefCount(this)) { if (isInUseByTransaction()) { setInUseByTransaction(false); @@ -1436,7 +1449,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } @Override - public final synchronized void resetRefCount(NewLRUClockHand lruList) { + public synchronized void resetRefCount(NewLRUClockHand lruList) { if (isInUseByTransaction()) { setInUseByTransaction(false); if (lruList != null) { @@ -1445,7 +1458,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - protected final void _setValue(Object val) { + void _setValue(Object val) { setValueField(val); } @@ -1474,16 +1487,19 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje */ protected abstract void setValueField(@Unretained Object v); + @Override @Retained public Object getTransformedValue() { return _getValueRetain(null, false); } - public final boolean getValueWasResultOfSearch() { + @Override + public boolean getValueWasResultOfSearch() { return areAnyBitsSet(VALUE_RESULT_OF_SEARCH); } - public final void setValueResultOfSearch(boolean v) { + @Override + public void setValueResultOfSearch(boolean v) { if (v) { setBits(VALUE_RESULT_OF_SEARCH); } else { @@ -1493,26 +1509,22 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje public boolean hasValidVersion() { VersionStamp stamp = (VersionStamp) this; - boolean has = stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0; - return has; + return stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0; } + @Override public boolean hasStats() { // override this in implementations that have stats return false; } - /** - * @see HashEntry#getMapValue() - */ - public final Object getMapValue() { + @Override + public Object getMapValue() { return this; } - /** - * @see HashEntry#setMapValue(Object) - */ - public final void setMapValue(final Object newValue) { + @Override + public void setMapValue(final Object newValue) { if (this != newValue) { Assert.fail("AbstractRegionEntry#setMapValue: unexpected setMapValue " + "with newValue=" + newValue + ", this=" + this); @@ -1522,47 +1534,40 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje protected abstract void setEntryHash(int v); @Override - public final String toString() { + public String toString() { final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()).append('@') .append(Integer.toHexString(System.identityHashCode(this))).append(" ("); return appendFieldsToString(sb).append(')').toString(); } protected StringBuilder appendFieldsToString(final StringBuilder sb) { - sb.append("key=").append(getKey()).append("; rawValue=").append(_getValue()); // OFFHEAP - // _getValue ok: - // the current - // toString on - // ObjectChunk is - // safe to use - // without incing - // refcount. + // OFFHEAP _getValue ok: the current toString on ObjectChunk is safe to use without incing + // refcount. + sb.append("key=").append(getKey()).append("; rawValue=").append(_getValue()); VersionStamp stamp = getVersionStamp(); if (stamp != null) { - sb.append("; version=").append(stamp.asVersionTag() + ";member=" + stamp.getMemberID()); + sb.append("; version=").append(stamp.asVersionTag()).append(";member=") + .append(stamp.getMemberID()); } return sb; } - /* - * (non-Javadoc) This generates version tags for outgoing messages for all subclasses supporting - * concurrency versioning. It also sets the entry's version stamp to the tag's values. - * - * @see - * org.apache.geode.internal.cache.RegionEntry#generateVersionTag(org.apache.geode.distributed. - * DistributedMember, boolean) + /** + * This generates version tags for outgoing messages for all subclasses supporting concurrency + * versioning. It also sets the entry's version stamp to the tag's values. */ - public VersionTag generateVersionTag(VersionSource mbr, boolean withDelta, LocalRegion region, + @Override + public VersionTag generateVersionTag(VersionSource member, boolean withDelta, LocalRegion region, EntryEventImpl event) { VersionStamp stamp = this.getVersionStamp(); - if (stamp != null && region.getServerProxy() == null) { // clients do not generate versions + if (stamp != null && region.getServerProxy() == null) { + // clients do not generate versions int v = stamp.getEntryVersion() + 1; if (v > 0xFFFFFF) { v -= 0x1000000; // roll-over } VersionSource previous = stamp.getMemberID(); - // For non persistent regions, we allow the member to be null and // when we send a message and the remote side can determine the member // from the sender. For persistent regions, we need to send @@ -1570,14 +1575,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje // // TODO - RVV - optimize the way we send the persistent id to save // space. - if (mbr == null) { + if (member == null) { VersionSource regionMember = region.getVersionMember(); if (regionMember instanceof DiskStoreID) { - mbr = regionMember; + member = regionMember; } } - VersionTag tag = VersionTag.create(mbr); + VersionTag tag = VersionTag.create(member); tag.setEntryVersion(v); if (region.getVersionVector() != null) { // Use region version if already provided, else generate @@ -1617,14 +1622,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje tag.setDistributedSystemId(dsid); } stamp.setVersions(tag); - stamp.setMemberID(mbr); + stamp.setMemberID(member); event.setVersionTag(tag); if (logger.isDebugEnabled()) { logger.debug( "generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag, event.getKey(), event.getOldValueStringForm(), event.getNewValueStringForm(), - (event.getContext() == null ? "none" - : event.getContext().getDistributedMember().getName()), + event.getContext() == null ? "none" + : event.getContext().getDistributedMember().getName(), region.getFullPath(), region.getVersionVector()); } return tag; @@ -1632,32 +1637,16 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje return null; } - /** set/unset the flag noting that a tombstone has been scheduled for this entry */ - public void setTombstoneScheduled(boolean scheduled) { - if (scheduled) { - setBits(TOMBSTONE_SCHEDULED); - } else { - clearBits(~TOMBSTONE_SCHEDULED); - } - } - /** - * return the flag noting whether a tombstone has been scheduled for this entry. This should be - * called under synchronization on the region entry if you want an accurate result. - */ - public boolean isTombstoneScheduled() { - return areAnyBitsSet(TOMBSTONE_SCHEDULED); - } - - /* - * (non-Javadoc) This performs a concurrency check. + * This performs a concurrency check. * * This check compares the version number first, followed by the member ID. * * Wraparound of the version number is detected and handled by extending the range of versions by * one bit. * - * The normal membership ID comparison method is used.<p> + * The normal membership ID comparison method is used. + * <p> * * Note that a tag from a remote (WAN) system may be in the event. If this is the case this method * will either invoke a user plugin that allows/disallows the event (and may modify the value) or @@ -1665,16 +1654,12 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * distributedSystemIDs. * * @throws ConcurrentCacheModificationException if the event conflicts with an event that has - * already been applied to the entry. - * - * @see - * org.apache.geode.internal.cache.RegionEntry#concurrencyCheck(org.apache.geode.cache.EntryEvent) + * already been applied to the entry. */ public void processVersionTag(EntryEvent cacheEvent) { processVersionTag(cacheEvent, true); } - protected void processVersionTag(EntryEvent cacheEvent, boolean conflictCheck) { EntryEventImpl event = (EntryEventImpl) cacheEvent; VersionTag tag = event.getVersionTag(); @@ -1746,10 +1731,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } if (r.getVersionVector() != null && r.getServerProxy() == null - && (r.getDataPolicy().withPersistence() || !r.getScope().isLocal())) { // bug #45258 - - // perf degradation - // for local - // regions and RVV + && (r.getDataPolicy().withPersistence() || !r.getScope().isLocal())) { + // bug #45258 - perf degradation for local regions and RVV VersionSource who = tag.getMemberID(); if (who == null) { who = originator; @@ -1760,29 +1743,22 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje assert !tag.isFromOtherMember() || tag.getMemberID() != null : "remote tag is missing memberID"; - - // [bruce] for a long time I had conflict checks turned off in clients when + // for a long time I had conflict checks turned off in clients when // receiving a response from a server and applying it to the cache. This lowered // the CPU cost of versioning but eventually had to be pulled for bug #45453 - // if (r.getServerProxy() != null && conflictCheck) { - // // events coming from servers while a local sync is held on the entry - // // do not require a conflict check. Conflict checks were already - // // performed on the server and here we just consume whatever was sent back. - // // Event.isFromServer() returns true for client-update messages and - // // for putAll/getAll, which do not hold syncs during the server operation. - // conflictCheck = event.isFromServer(); - // } - // else - - // [bruce] for a very long time we had conflict checks turned off for PR buckets. + + // events coming from servers while a local sync is held on the entry + // do not require a conflict check. Conflict checks were already + // performed on the server and here we just consume whatever was sent back. + // Event.isFromServer() returns true for client-update messages and + // for putAll/getAll, which do not hold syncs during the server operation. + + // for a very long time we had conflict checks turned off for PR buckets. // Bug 45669 showed a primary dying in the middle of distribution. This caused // one backup bucket to have a v2. The other bucket was promoted to primary and // generated a conflicting v2. We need to do the check so that if this second // v2 loses to the original one in the delta-GII operation that the original v2 // will be the winner in both buckets. - // if (r.isUsedForPartitionedRegionBucket()) { - // conflictCheck = false; // primary/secondary model - // } // The new value in event is not from GII, even it could be tombstone basicProcessVersionTag(r, tag, false, eventHasDelta, dmId, originator, conflictCheck); @@ -1792,26 +1768,22 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - protected final void basicProcessVersionTag(LocalRegion region, VersionTag tag, + protected void basicProcessVersionTag(LocalRegion region, VersionTag tag, boolean isTombstoneFromGII, boolean deltaCheck, VersionSource dmId, InternalDistributedMember sender, boolean checkForConflict) { - StringBuilder verbose = null; - if (tag != null) { VersionStamp stamp = getVersionStamp(); + StringBuilder verbose = null; if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { VersionTag stampTag = stamp.asVersionTag(); - if (stampTag.hasValidVersion() && checkForConflict) { // only be verbose here if there's a - // possibility we might reject the - // operation + if (stampTag.hasValidVersion() && checkForConflict) { + // only be verbose here if there's a possibility we might reject the operation verbose = new StringBuilder(); - verbose.append( - "processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=") - .append(tag).append(", checkForConflict=").append(checkForConflict); // .append(", - // current - // value=").append(_getValue()); + verbose.append("processing tag for key ").append(getKey()).append(", stamp=") + .append(stamp.asVersionTag()).append(", tag=").append(tag) + .append(", checkForConflict=").append(checkForConflict); } } @@ -1854,10 +1826,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } - private void applyVersionTag(LocalRegion region, VersionStamp stamp, VersionTag tag, InternalDistributedMember sender) { - // stamp.setPreviousMemberID(stamp.getMemberID()); VersionSource mbr = tag.getMemberID(); if (mbr == null) { mbr = sender; @@ -1876,23 +1846,20 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } /** perform conflict checking for a stamp/tag */ - protected boolean checkForConflict(LocalRegion region, VersionStamp stamp, VersionTag tag, + private boolean checkForConflict(LocalRegion region, VersionStamp stamp, VersionTag tag, boolean isTombstoneFromGII, boolean deltaCheck, VersionSource dmId, InternalDistributedMember sender, StringBuilder verbose) { int stampVersion = stamp.getEntryVersion(); int tagVersion = tag.getEntryVersion(); - boolean throwex = false; - boolean apply = false; - if (stamp.getVersionTimeStamp() != 0) { // new entries have no timestamp // check for wrap-around on the version number long difference = tagVersion - stampVersion; if (0x10000 < difference || difference < -0x10000) { if (verbose != null) { - verbose - .append("\nversion rollover detected: tag=" + tagVersion + " stamp=" + stampVersion); + verbose.append("\nversion rollover detected: tag=").append(tagVersion).append(" stamp=") + .append(stampVersion); } if (difference < 0) { tagVersion += 0x1000000L; @@ -1909,6 +1876,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje checkForDeltaConflict(region, stampVersion, tagVersion, stamp, tag, dmId, sender, verbose); } + boolean throwex = false; + boolean apply = false; if (stampVersion == 0 || stampVersion < tagVersion) { if (verbose != null) { verbose.append(" - applying change"); @@ -1968,11 +1937,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje verbose.append(" - disallowing duplicate marked with posdup"); } throwex = true; - } else /* - * if (isTombstoneFromGII && isTombstone()) { if (verbose != null) { - * verbose.append(" - disallowing duplicate tombstone from GII"); } return false; // - * bug #49601 don't schedule tombstones from GII if there's already one here } else - */ { + } else { if (verbose != null) { verbose.append(" - allowing duplicate"); } @@ -1991,7 +1956,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) { return isTombstone - && (timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis(); + && timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= region.cacheTimeMillis(); } private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, @@ -2022,8 +1987,6 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje * for an event containing a delta we must check to see if the tag's previous member id is the * stamp's member id and ensure that the version is only incremented by 1. Otherwise the delta is * being applied to a value that does not match the source of the delta. - * - * @throws InvalidDeltaException */ private void checkForDeltaConflict(LocalRegion region, long stampVersion, long tagVersion, VersionStamp stamp, VersionTag tag, VersionSource dmId, InternalDistributedMember sender, @@ -2102,21 +2065,19 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } @Override - public void changeEventValue(Object v) { - newValue[0] = v; + public void changeEventValue(Object value) { + newValue[0] = value; } }; + @Released TimestampedEntryEventImpl timestampedEvent = (TimestampedEntryEventImpl) event .getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime); // gateway conflict resolvers will usually want to see the old value if (!timestampedEvent.hasOldValue() && isRemoved()) { - timestampedEvent.setOldValue(getValue(timestampedEvent.getRegion())); // OFFHEAP: since - // isRemoved I think - // getValue will never - // be stored off heap - // in this case + // OFFHEAP: since isRemoved I think getValue will never be stored off heap in this case + timestampedEvent.setOldValue(getValue(timestampedEvent.getRegion())); } Throwable thr = null; @@ -2144,7 +2105,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } if (isDebugEnabled) { - logger.debug("done invoking resolver {}", thr); + logger.debug("done invoking resolver", thr); } if (thr == null) { if (disallow[0]) { @@ -2174,58 +2135,55 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje if (isDebugEnabled) { logger.debug("performing normal WAN conflict check"); } - if (tagTime > stampTime || (tagTime == stampTime && tagDsid >= stampDsid)) { + if (tagTime > stampTime || tagTime == stampTime && tagDsid >= stampDsid) { if (isDebugEnabled) { logger.debug("allowing event"); } return true; } if (isDebugEnabled) { - logger.debug("disallowing event for " + event.getKey()); + logger.debug("disallowing event for {}", event.getKey()); } throw new ConcurrentCacheModificationException("conflicting WAN event detected"); } static boolean isCompressible(RegionEntryContext context, Object value) { - return ((value != null) && (context != null) && (context.getCompressor() != null) - && !Token.isInvalidOrRemoved(value)); + return value != null && context != null && context.getCompressor() != null + && !Token.isInvalidOrRemoved(value); } /* subclasses supporting versions must override this */ + @Override public VersionStamp getVersionStamp() { return null; } + @Override public boolean isValueNull() { - return (null == getValueAsToken()); + return null == getValueAsToken(); } + @Override public boolean isInvalid() { return Token.isInvalid(getValueAsToken()); } + @Override public boolean isDestroyed() { return Token.isDestroyed(getValueAsToken()); } + @Override public void setValueToNull() { _setValue(null); } + @Override public boolean isInvalidOrRemoved() { return Token.isInvalidOrRemoved(getValueAsToken()); } /** - * Maximum size of a string that can be encoded as char. - */ - public static final int MAX_INLINE_STRING_KEY_CHAR_ENCODING = 7; - /** - * Maximum size of a string that can be encoded as byte. - */ - public static final int MAX_INLINE_STRING_KEY_BYTE_ENCODING = 15; - - /** * This is only retained in off-heap subclasses. However, it's marked as Retained here so that * callers are aware that the value may be retained. */
http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index bc9fcdf..e0fc27c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -12,12 +12,29 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.GemFireIOException; import org.apache.geode.InvalidDeltaException; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.query.IndexMaintenanceException; import org.apache.geode.cache.query.QueryException; import org.apache.geode.cache.query.internal.index.IndexManager; @@ -37,7 +54,12 @@ import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl; import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; -import org.apache.geode.internal.cache.versions.*; +import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.internal.cache.versions.VersionHolder; +import org.apache.geode.internal.cache.versions.VersionSource; +import org.apache.geode.internal.cache.versions.VersionStamp; +import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.concurrent.MapCallbackAdapter; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -54,21 +76,12 @@ import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.sequencelog.EntryLogger; import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap; -import org.apache.logging.log4j.Logger; - -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - /** * Abstract implementation of {@link RegionMap}that has all the common behavior. * * @since GemFire 3.5.1 - * - * */ - public abstract class AbstractRegionMap implements RegionMap { - private static final Logger logger = LogService.getLogger(); /** The underlying map for this region. */ @@ -81,10 +94,15 @@ public abstract class AbstractRegionMap implements RegionMap { static Runnable testHookRunnableFor48182 = null; private RegionEntryFactory entryFactory; + private Attributes attr; - private transient Object owner; // the region that owns this map - protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {} + // the region that owns this map + private Object owner; + + protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) { + // do nothing + } protected void initialize(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs, boolean isLRU) { @@ -93,22 +111,19 @@ public abstract class AbstractRegionMap implements RegionMap { _setMap(createConcurrentMap(attr.initialCapacity, attr.loadFactor, attr.concurrencyLevel, false, new AbstractRegionEntry.HashRegionEntryCreator())); - final GemFireCacheImpl cache; boolean isDisk; - boolean withVersioning = false; - boolean offHeap = false; + boolean withVersioning; + boolean offHeap; if (owner instanceof LocalRegion) { LocalRegion region = (LocalRegion) owner; isDisk = region.getDiskRegion() != null; - cache = region.getGemFireCache(); withVersioning = region.getConcurrencyChecksEnabled(); offHeap = region.getOffHeap(); } else if (owner instanceof PlaceHolderDiskRegion) { - offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap(); + offHeap = ((RegionEntryContext) owner).getOffHeap(); isDisk = true; withVersioning = - ((PlaceHolderDiskRegion) owner).getFlags().contains(DiskRegionFlag.IS_WITH_VERSIONING); - cache = GemFireCacheImpl.getInstance(); + ((DiskRegionView) owner).getFlags().contains(DiskRegionFlag.IS_WITH_VERSIONING); } else { throw new IllegalStateException("expected LocalRegion or PlaceHolderDiskRegion"); } @@ -117,15 +132,15 @@ public abstract class AbstractRegionMap implements RegionMap { attr.statisticsEnabled, isLRU, isDisk, withVersioning, offHeap)); } - protected CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(int initialCapacity, + private CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(int initialCapacity, float loadFactor, int concurrencyLevel, boolean isIdentityMap, CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) { if (entryCreator != null) { - return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor, - concurrencyLevel, isIdentityMap, entryCreator); + return new CustomEntryConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel, + isIdentityMap, entryCreator); } else { - return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor, - concurrencyLevel, isIdentityMap); + return new CustomEntryConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel, + isIdentityMap); } } @@ -1548,7 +1563,6 @@ public abstract class AbstractRegionMap implements RegionMap { final boolean isDebugEnabled = logger.isDebugEnabled(); final LocalRegion owner = _getOwner(); - owner.checkBeforeEntrySync(txEvent); final boolean isRegionReady = !inTokenMode; final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId()); @@ -2337,7 +2351,6 @@ public abstract class AbstractRegionMap implements RegionMap { TXEntryState txEntryState, VersionTag versionTag, long tailKey) { // boolean didInvalidate = false; final LocalRegion owner = _getOwner(); - owner.checkBeforeEntrySync(txEvent); @Released EntryEventImpl cbEvent = null; @@ -2408,8 +2421,7 @@ public abstract class AbstractRegionMap implements RegionMap { } catch (RegionClearedException rce) { clearOccured = true; } - owner.txApplyInvalidatePart2(oldRe, oldRe.getKey(), didDestroy, true, - clearOccured); + owner.txApplyInvalidatePart2(oldRe, oldRe.getKey(), didDestroy, true); // didInvalidate = true; if (invokeCallbacks) { switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin); @@ -2455,7 +2467,7 @@ public abstract class AbstractRegionMap implements RegionMap { } catch (RegionClearedException rce) { clearOccured = true; } - owner.txApplyInvalidatePart2(newRe, newRe.getKey(), didDestroy, true, clearOccured); + owner.txApplyInvalidatePart2(newRe, newRe.getKey(), didDestroy, true); if (invokeCallbacks) { switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin); @@ -2527,7 +2539,7 @@ public abstract class AbstractRegionMap implements RegionMap { } catch (RegionClearedException rce) { clearOccured = true; } - owner.txApplyInvalidatePart2(re, re.getKey(), didDestroy, true, clearOccured); + owner.txApplyInvalidatePart2(re, re.getKey(), didDestroy, true); // didInvalidate = true; if (invokeCallbacks) { switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin); @@ -3080,7 +3092,6 @@ public abstract class AbstractRegionMap implements RegionMap { Operation putOp = p_putOp; - owner.checkBeforeEntrySync(txEvent); Object newValue = nv; final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId()); @@ -3175,7 +3186,7 @@ public abstract class AbstractRegionMap implements RegionMap { long lastMod = owner.cacheTimeMillis(); EntryLogger.logTXPut(_getOwnerObject(), key, nv); re.updateStatsForPut(lastMod, lastMod); - owner.txApplyPutPart2(re, re.getKey(), newValue, lastMod, false, didDestroy, + owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy, clearOccured); } } finally { @@ -3200,7 +3211,7 @@ public abstract class AbstractRegionMap implements RegionMap { } } if (didDestroy && !opCompleted) { - owner.txApplyInvalidatePart2(re, re.getKey(), true, false, false /* clear */); + owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */); } } if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) { @@ -3275,8 +3286,8 @@ public abstract class AbstractRegionMap implements RegionMap { long lastMod = owner.cacheTimeMillis(); EntryLogger.logTXPut(_getOwnerObject(), key, nv); oldRe.updateStatsForPut(lastMod, lastMod); - owner.txApplyPutPart2(oldRe, oldRe.getKey(), newValue, lastMod, false, - didDestroy, clearOccured); + owner.txApplyPutPart2(oldRe, oldRe.getKey(), lastMod, false, didDestroy, + clearOccured); } } finally { if (oldRe != null && owner.indexMaintenanceSynchronous) { @@ -3341,7 +3352,7 @@ public abstract class AbstractRegionMap implements RegionMap { long lastMod = owner.cacheTimeMillis(); EntryLogger.logTXPut(_getOwnerObject(), key, nv); newRe.updateStatsForPut(lastMod, lastMod); - owner.txApplyPutPart2(newRe, newRe.getKey(), newValue, lastMod, true, didDestroy, + owner.txApplyPutPart2(newRe, newRe.getKey(), lastMod, true, didDestroy, clearOccured); } } finally { http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/AddCacheServerProfileMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AddCacheServerProfileMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AddCacheServerProfileMessage.java index 6928ad2..ff0101b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AddCacheServerProfileMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AddCacheServerProfileMessage.java @@ -44,8 +44,9 @@ public class AddCacheServerProfileMessage extends SerialDistributionMessage protected void process(DistributionManager dm) { int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - if (cache != null && !cache.isClosed()) { // will be null if not initialized + InternalCache cache = GemFireCacheImpl.getInstance(); + // will be null if not initialized + if (cache != null && !cache.isClosed()) { operateOnCache(cache); } } finally { @@ -55,16 +56,16 @@ public class AddCacheServerProfileMessage extends SerialDistributionMessage reply.setRecipient(getSender()); try { dm.putOutgoing(reply); - } catch (CancelException e) { + } catch (CancelException ignore) { // can't send a reply, so ignore the exception } } } - private void operateOnCache(GemFireCacheImpl cache) { + private void operateOnCache(InternalCache cache) { final boolean isDebugEnabled = logger.isDebugEnabled(); - for (DistributedRegion r : this.getDistributedRegions(cache)) { + for (DistributedRegion r : getDistributedRegions(cache)) { CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor(); CacheDistributionAdvisor.CacheProfile cp = (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender()); @@ -91,16 +92,16 @@ public class AddCacheServerProfileMessage extends SerialDistributionMessage } /** set the hasCacheServer flags for all regions in this cache */ - public void operateOnLocalCache(GemFireCacheImpl cache) { + public void operateOnLocalCache(InternalCache cache) { int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - for (LocalRegion r : this.getAllRegions(cache)) { + for (LocalRegion r : getAllRegions(cache)) { FilterProfile fp = r.getFilterProfile(); if (fp != null) { fp.getLocalProfile().hasCacheServer = true; } } - for (PartitionedRegion r : this.getPartitionedRegions(cache)) { + for (PartitionedRegion r : getPartitionedRegions(cache)) { FilterProfile fp = r.getFilterProfile(); if (fp != null) { fp.getLocalProfile().hasCacheServer = true; @@ -112,13 +113,13 @@ public class AddCacheServerProfileMessage extends SerialDistributionMessage } - private Set<LocalRegion> getAllRegions(GemFireCacheImpl gfc) { - return gfc.getAllRegions(); + private Set<LocalRegion> getAllRegions(InternalCache internalCache) { + return internalCache.getAllRegions(); } - private Set<DistributedRegion> getDistributedRegions(GemFireCacheImpl gfc) { - Set<DistributedRegion> result = new HashSet(); - for (LocalRegion r : gfc.getAllRegions()) { + private Set<DistributedRegion> getDistributedRegions(InternalCache internalCache) { + Set<DistributedRegion> result = new HashSet<>(); + for (LocalRegion r : internalCache.getAllRegions()) { if (r instanceof DistributedRegion) { result.add((DistributedRegion) r); } @@ -126,14 +127,14 @@ public class AddCacheServerProfileMessage extends SerialDistributionMessage return result; } - private Set<PartitionedRegion> getPartitionedRegions(GemFireCacheImpl gfc) { - Set<PartitionedRegion> result = new HashSet(gfc.getPartitionedRegions()); - return result; + private Set<PartitionedRegion> getPartitionedRegions(InternalCache internalCache) { + return (Set<PartitionedRegion>) new HashSet(internalCache.getPartitionedRegions()); } /** for deserialization only */ public AddCacheServerProfileMessage() {} + @Override public int getDSFID() { return ADD_CACHESERVER_PROFILE_UPDATE; } http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index 8b8705a..7b35cb5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -14,6 +14,30 @@ */ package org.apache.geode.internal.cache; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.cache.CacheClosedException; @@ -42,16 +66,6 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.util.StopWatch; -import org.apache.logging.log4j.Logger; - -import java.io.*; -import java.util.*; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Specialized {@link CacheDistributionAdvisor} for {@link BucketRegion BucketRegions}. The @@ -1452,7 +1466,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { for (;;) { // bail out if the system starts closing this.getAdvisee().getCancelCriterion().checkCancelInProgress(null); - final GemFireCacheImpl cache = (GemFireCacheImpl) getBucket().getCache(); + final InternalCache cache = getBucket().getCache(); if (cache != null && cache.isCacheAtShutdownAll()) { throw new CacheClosedException("Cache is shutting down"); } @@ -1727,9 +1741,9 @@ public class BucketAdvisor extends CacheDistributionAdvisor { @Override protected Profile instantiateProfile(InternalDistributedMember memberId, int version) { if (!this.pRegion.isShadowPR()) { - GemFireCacheImpl c = getProxyBucketRegion().getCache(); + InternalCache cache = getProxyBucketRegion().getCache(); List servers = null; - servers = c.getCacheServers(); + servers = cache.getCacheServers(); HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>(); for (Object object : servers) {