http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java index dacf8f5..9d85008 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java @@ -106,7 +106,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa if (this.txUniqId == TXManagerImpl.NOTX) { return null; } else { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { // ignore and return, we are shutting down! return null; @@ -116,9 +116,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa } } - private void cleanupTransasction(TXStateProxy tx) { + private void cleanupTransaction(TXStateProxy tx) { if (this.txUniqId != TXManagerImpl.NOTX) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { // ignore and return, we are shutting down! return; @@ -130,7 +130,6 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa @Override protected void process(final DistributionManager dm) { - Throwable thr = null; boolean sendReply = true; DistributedRegion dr = null; @@ -202,7 +201,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa logger.trace(LogMarker.DM, "Exception caught while processing message", t); } } finally { - cleanupTransasction(tx); + cleanupTransaction(tx); if (sendReply && this.processorId != 0) { ReplyException rex = null; if (thr != null) { @@ -275,9 +274,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa /** * check to see if the cache is closing */ - final public boolean checkCacheClosing(DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return (cache == null || cache.getCancelCriterion().isCancelInProgress()); + private boolean checkCacheClosing(DistributionManager dm) { + InternalCache cache = GemFireCacheImpl.getInstance(); + return cache == null || cache.getCancelCriterion().isCancelInProgress(); } /** @@ -285,7 +284,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa * * @return true if the distributed system is closing */ - final public boolean checkDSClosing(DistributionManager dm) { + private boolean checkDSClosing(DistributionManager dm) { InternalDistributedSystem ds = dm.getSystem(); return (ds == null || ds.isDisconnecting()); }
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java index 130e2a8..81bb7fb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java @@ -28,15 +28,17 @@ public class DynamicRegionFactoryImpl extends DynamicRegionFactory { * create an instance of the factory. This is normally only done by DynamicRegionFactory's static * initialization */ - public DynamicRegionFactoryImpl() {} + public DynamicRegionFactoryImpl() { + // nothing + } /** close the factory. Only do this if you're closing the cache, too */ public void close() { - _close(); + doClose(); } /** initialize the factory for use with a new cache */ - public void internalInit(GemFireCacheImpl c) throws CacheException { - _internalInit(c); + void internalInit(InternalCache cache) throws CacheException { + doInternalInit(cache); } } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 02c0422..ac4954a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -12,11 +12,33 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; -import org.apache.geode.*; -import org.apache.geode.cache.*; +import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.*; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.function.Function; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CopyHelper; +import org.apache.geode.DataSerializer; +import org.apache.geode.DeltaSerializationException; +import org.apache.geode.GemFireIOException; +import org.apache.geode.InvalidDeltaException; +import org.apache.geode.SerializationException; +import org.apache.geode.SystemFailure; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.SerializedCacheValue; +import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.query.IndexMaintenanceException; import org.apache.geode.cache.query.QueryException; import org.apache.geode.cache.query.internal.index.IndexManager; @@ -28,7 +50,14 @@ import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.*; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.ByteArrayDataInput; +import org.apache.geode.internal.DSFIDFactory; +import org.apache.geode.internal.DataSerializableFixedID; +import org.apache.geode.internal.HeapDataOutputStream; +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.Sendable; +import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.lru.Sizeable; import org.apache.geode.internal.cache.partitioned.PartitionMessage; @@ -43,46 +72,46 @@ import org.apache.geode.internal.lang.StringUtils; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.geode.internal.offheap.*; +import org.apache.geode.internal.offheap.OffHeapHelper; +import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper; +import org.apache.geode.internal.offheap.ReferenceCountHelper; +import org.apache.geode.internal.offheap.Releasable; +import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.util.ArrayUtils; import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.pdx.internal.PeerTypeRegistration; -import org.apache.logging.log4j.Logger; - -import java.io.*; -import java.util.function.Function; - -import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE; -import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE; /** * Implementation of an entry event + * + * must be public for DataSerializableFixedID */ -// must be public for DataSerializableFixedID public class EntryEventImpl implements EntryEvent, InternalCacheEvent, DataSerializableFixedID, EntryOperation, Releasable { private static final Logger logger = LogService.getLogger(); // PACKAGE FIELDS // public transient LocalRegion region; + private transient RegionEntry re; protected KeyInfo keyInfo; - // private long eventId; /** the event's id. Scoped by distributedMember. */ protected EventID eventID; private Object newValue = null; + /** * If we ever serialize the new value then it should be stored in this field in case we need the * serialized form again later. This was added to fix bug 43781. Note that we also have the * "newValueBytes" field. But it is only non-null if setSerializedNewValue was called. */ private byte[] cachedSerializedNewValue = null; + @Retained(ENTRY_EVENT_OLD_VALUE) private Object oldValue = null; @@ -116,15 +145,11 @@ public class EntryEventImpl */ protected DistributedMember distributedMember; - /** * transient storage for the message that caused the event */ transient DistributionMessage causedByMessage; - - // private static long eventID = 0; - /** * The originating membershipId of this event. * @@ -138,12 +163,12 @@ public class EntryEventImpl */ private byte[] deltaBytes = null; - /** routing information for cache clients for this event */ private FilterInfo filterInfo; /** new value stored in serialized form */ protected byte[] newValueBytes; + /** old value stored in serialized form */ private byte[] oldValueBytes; @@ -157,7 +182,9 @@ public class EntryEventImpl public final static Object SUSPECT_TOKEN = new Object(); - public EntryEventImpl() {} + public EntryEventImpl() { + // do nothing + } /** * Reads the contents of this message from the given input. @@ -229,7 +256,7 @@ public class EntryEventImpl } this.txId = this.region.getTXId(); - /** + /* * this might set txId for events done from a thread that has a tx even though the op is non-tx. * For example region ops. */ @@ -341,9 +368,8 @@ public class EntryEventImpl @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, EventID eventID) { - EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, newValue, callbackArgument, - originRemote, distributedMember, generateCallbacks, eventID); - return entryEvent; + return new EntryEventImpl(region, op, key, newValue, callbackArgument, originRemote, + distributedMember, generateCallbacks, eventID); } /** @@ -356,9 +382,8 @@ public class EntryEventImpl public static EntryEventImpl create(LocalRegion region, Operation op, Object key, boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, boolean fromRILocalDestroy) { - EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, originRemote, distributedMember, - generateCallbacks, fromRILocalDestroy); - return entryEvent; + return new EntryEventImpl(region, op, key, originRemote, distributedMember, generateCallbacks, + fromRILocalDestroy); } /** @@ -374,9 +399,8 @@ public class EntryEventImpl public static EntryEventImpl create(final LocalRegion region, Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) { - EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, newVal, callbackArgument, - originRemote, distributedMember, generateCallbacks, initializeId); - return entryEvent; + return new EntryEventImpl(region, op, key, newVal, callbackArgument, originRemote, + distributedMember, generateCallbacks, initializeId); } /** @@ -915,7 +939,7 @@ public class EntryEventImpl public final Object getOldValueAsOffHeapDeserializedOrRaw() { Object result = basicGetOldValue(); if (mayHaveOffHeapReferences() && result instanceof StoredObject) { - result = ((StoredObject) result).getDeserializedForReading(); + result = ((CachedDeserializable) result).getDeserializedForReading(); } return AbstractRegion.handleNotAvailable(result); // fixes 49499 } @@ -1289,7 +1313,7 @@ public class EntryEventImpl public final Object getNewValueAsOffHeapDeserializedOrRaw() { Object result = getRawNewValue(); if (mayHaveOffHeapReferences() && result instanceof StoredObject) { - result = ((StoredObject) result).getDeserializedForReading(); + result = ((CachedDeserializable) result).getDeserializedForReading(); } return AbstractRegion.handleNotAvailable(result); // fixes 49499 } @@ -1462,8 +1486,6 @@ public class EntryEventImpl * hasn't been set yet. * * @param oldValueForDelta Used by Delta Propagation feature - * - * @throws RegionClearedException */ void putExistingEntry(final LocalRegion owner, final RegionEntry reentry, boolean requireOldValue, Object oldValueForDelta) throws RegionClearedException { @@ -1524,8 +1546,6 @@ public class EntryEventImpl /** * Put a newValue into the given, write synced, new, region entry. - * - * @throws RegionClearedException */ void putNewEntry(final LocalRegion owner, final RegionEntry reentry) throws RegionClearedException { @@ -1791,7 +1811,7 @@ public class EntryEventImpl OffHeapHelper.releaseWithNoTracking(v); } } - } catch (EntryNotFoundException ex) { + } catch (EntryNotFoundException ignore) { return false; } } @@ -2012,7 +2032,7 @@ public class EntryEventImpl synchronized (this.offHeapLock) { ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf); } - } catch (IllegalStateException ex) { + } catch (IllegalStateException ignore) { buf.append("OFFHEAP_VALUE_FREED"); } buf.append(";newValue="); @@ -2020,7 +2040,7 @@ public class EntryEventImpl synchronized (this.offHeapLock) { ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf); } - } catch (IllegalStateException ex) { + } catch (IllegalStateException ignore) { buf.append("OFFHEAP_VALUE_FREED"); } buf.append(";callbackArg="); @@ -2029,10 +2049,6 @@ public class EntryEventImpl buf.append(isOriginRemote()); buf.append(";originMember="); buf.append(getDistributedMember()); - // if (this.partitionMessage != null) { - // buf.append("; partitionMessage="); - // buf.append(this.partitionMessage); - // } if (this.isPossibleDuplicate()) { buf.append(";posDup"); } @@ -2054,11 +2070,8 @@ public class EntryEventImpl buf.append(this.eventID); } if (this.deltaBytes != null) { - buf.append(";[" + this.deltaBytes.length + " deltaBytes]"); + buf.append(";[").append(this.deltaBytes.length).append(" deltaBytes]"); } - // else { - // buf.append(";[no deltaBytes]"); - // } if (this.filterInfo != null) { buf.append(";routing="); buf.append(this.filterInfo); @@ -2239,8 +2252,6 @@ public class EntryEventImpl /** * Sets the operation type. - * - * @param eventType */ public void setEventType(EnumListenerEvent eventType) { this.eventType = eventType; @@ -2416,8 +2427,6 @@ public class EntryEventImpl /** * This method sets the delta bytes used in Delta Propagation feature. <B>For internal delta, see * setNewValue().</B> - * - * @param deltaBytes */ public void setDeltaBytes(byte[] deltaBytes) { this.deltaBytes = deltaBytes; @@ -2494,7 +2503,6 @@ public class EntryEventImpl * this method joins together version tag timestamps and the "lastModified" timestamps generated * and stored in entries. If a change does not already carry a lastModified timestamp * - * @param suggestedTime * @return the timestamp to store in the entry */ public long getEventTime(long suggestedTime) { @@ -2741,10 +2749,10 @@ public class EntryEventImpl // System.identityHashCode(ov)); if (ReferenceCountHelper.trackReferenceCounts()) { ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner()); - ((StoredObject) ov).release(); + ((Releasable) ov).release(); ReferenceCountHelper.setReferenceCountOwner(null); } else { - ((StoredObject) ov).release(); + ((Releasable) ov).release(); } } OffHeapHelper.releaseAndTrackOwner(nv, this); http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java index 4498b36..87835ff 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java @@ -14,6 +14,21 @@ */ package org.apache.geode.internal.cache; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializer; import org.apache.geode.InternalGemFireException; import org.apache.geode.distributed.DistributedMember; @@ -30,21 +45,13 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.util.Breadcrumbs; -import org.apache.logging.log4j.Logger; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicLong; /** * This class uniquely identifies any Region Operation like create, update destroy etc. It is * composed of three parts , namely :- 1) DistributedMembershipID 2) ThreadID 3) SequenceID This * helps in sequencing the events belonging to a unique producer. - * - * */ -public final class EventID implements DataSerializableFixedID, Serializable, Externalizable { +public class EventID implements DataSerializableFixedID, Serializable, Externalizable { private static final Logger logger = LogService.getLogger(); /** turns on very verbose logging ove membership id bytes */ http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java index 278367c..2c86aed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java @@ -12,11 +12,21 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; import org.apache.geode.cache.client.PoolFactory; @@ -31,14 +41,6 @@ import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * EventTracker tracks the last sequence number for a particular memberID:threadID. It is used to @@ -86,11 +88,10 @@ public class EventTracker { */ private volatile InternalDistributedMember initialImageProvider; - /** * The cache associated with this tracker */ - GemFireCacheImpl cache; + InternalCache cache; /** * The name of this tracker @@ -110,12 +111,12 @@ public class EventTracker { /** * Initialize the EventTracker's timer task. This is stored in the cache for tracking and shutdown * purposes - * + * * @param cache the cache to schedule tasks with */ - public static ExpiryTask startTrackerServices(GemFireCacheImpl cache) { + public static ExpiryTask startTrackerServices(InternalCache cache) { long expiryTime = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", - PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3).longValue(); + PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3); ExpiryTask result = new ExpiryTask(cache, expiryTime); cache.getCCPTimer().scheduleAtFixedRate(result, expiryTime, expiryTime); // schedule(result, expiryTime); @@ -124,10 +125,10 @@ public class EventTracker { /** * Terminate the tracker's timer task - * + * * @param cache the cache holding the tracker task */ - public static void stopTrackerServices(GemFireCacheImpl cache) { + public static void stopTrackerServices(InternalCache cache) { cache.getEventTrackerTask().cancel(); } @@ -507,8 +508,6 @@ public class EventTracker { } /** - * @param event - * @param eventID * @return true if the event should not be tracked, false otherwise */ private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) { @@ -675,17 +674,21 @@ public class EventTracker { * Whether this object was removed by the cleanup thread. */ public boolean removed; + /** * public for tests only */ public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, VersionTag>(); + /** millisecond timestamp */ transient long endOfLifeTimer; /** * creates a new instance to save status of a putAllOperation */ - BulkOpHolder() {} + BulkOpHolder() { + // do nothing + } public void putVersionTag(EventID eventId, VersionTag versionTag) { entryVersionTags.put(eventId, versionTag); @@ -699,13 +702,13 @@ public class EventTracker { } } - static class ExpiryTask extends SystemTimerTask { + public static class ExpiryTask extends SystemTimerTask { - GemFireCacheImpl cache; + InternalCache cache; long expiryTime; List trackers = new LinkedList(); - public ExpiryTask(GemFireCacheImpl cache, long expiryTime) { + public ExpiryTask(InternalCache cache, long expiryTime) { this.cache = cache; this.expiryTime = expiryTime; } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java index d3f5987..14edad9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java @@ -12,17 +12,23 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; -/** - * ExpiryTask represents a timeout event for expiration - */ +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.ExpirationAction; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; @@ -32,13 +38,10 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.tcp.ConnectionTable; -import org.apache.logging.log4j.Logger; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +/** + * ExpiryTask represents a timeout event for expiration + */ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask { private static final Logger logger = LogService.getLogger(); @@ -49,8 +52,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask { static { // default to inline expiry to fix bug 37115 - int nThreads = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "EXPIRY_THREADS", 0).intValue(); + int nThreads = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "EXPIRY_THREADS", 0); if (nThreads > 0) { ThreadFactory tf = new ThreadFactory() { private int nextId = 0; @@ -396,7 +398,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask { } protected boolean isCacheClosing() { - return ((GemFireCacheImpl) getLocalRegion().getCache()).isClosed(); + return getLocalRegion().getCache().isClosed(); } /** @@ -464,7 +466,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask { } private static long calculateNow() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { // Use cache.cacheTimeMillis here. See bug 52267. InternalDistributedSystem ids = cache.getInternalDistributedSystem(); http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java index e7175f3..9a4eca3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java @@ -47,6 +47,7 @@ import org.apache.geode.cache.query.internal.cq.CqService; import org.apache.geode.cache.query.internal.cq.CqServiceProvider; import org.apache.geode.cache.query.internal.cq.ServerCQ; import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.DistributionAdvisee; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; @@ -88,7 +89,7 @@ public class FilterProfile implements DataSerializableFixedID { private static final Logger logger = LogService.getLogger(); /** enumeration of distributed profile operations */ - static enum operationType { + enum operationType { REGISTER_KEY, REGISTER_KEYS, REGISTER_PATTERN, @@ -201,8 +202,6 @@ public class FilterProfile implements DataSerializableFixedID { * used for instantiation of a profile associated with a region and not describing region filters * in a different process. Do not use this method when instantiating profiles to store in * distribution advisor profiles. - * - * @param r */ public FilterProfile(LocalRegion r) { this.region = r; @@ -499,7 +498,6 @@ public class FilterProfile implements DataSerializableFixedID { /** * Registers interest in a set of keys for a client * - * @param inputClientID * @param keys The list of keys in which to register interest * @param updatesAsInvalidates whether to send invalidations instead of updates * @return the registered keys @@ -796,7 +794,7 @@ public class FilterProfile implements DataSerializableFixedID { public void stopCq(ServerCQ cq) { ensureCqID(cq); if (logger.isDebugEnabled()) { - this.logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName()); + logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName()); } this.sendCQProfileOperation(operationType.STOP_CQ, cq); } @@ -919,7 +917,7 @@ public class FilterProfile implements DataSerializableFixedID { if (clientId.equals(client)) { try { cq.close(false); - } catch (Exception ex) { + } catch (Exception ignore) { if (logger.isDebugEnabled()) { logger.debug("Failed to remove CQ from the base region. CqName : {}", cq.getName()); } @@ -970,7 +968,7 @@ public class FilterProfile implements DataSerializableFixedID { } OperationMessage msg = new OperationMessage(); msg.regionName = this.region.getFullPath(); - msg.clientID = clientID.longValue(); + msg.clientID = clientID; msg.opType = opType; msg.interest = interest; msg.updatesAsInvalidates = updatesAsInvalidates; @@ -980,14 +978,14 @@ public class FilterProfile implements DataSerializableFixedID { private void sendFilterProfileOperation(OperationMessage msg) { Set recipients = - ((CacheDistributionAdvisee) this.region).getDistributionAdvisor().adviseProfileUpdate(); + ((DistributionAdvisee) this.region).getDistributionAdvisor().adviseProfileUpdate(); msg.setRecipients(recipients); ReplyProcessor21 rp = new ReplyProcessor21(this.region.getDistributionManager(), recipients); msg.processorId = rp.getProcessorId(); this.region.getDistributionManager().putOutgoing(msg); try { rp.waitForReplies(); - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -1014,9 +1012,10 @@ public class FilterProfile implements DataSerializableFixedID { } static final Profile[] NO_PROFILES = new Profile[0]; + private final CacheProfile localProfile = new CacheProfile(this); - private final Profile[] localProfileArray = new Profile[] {localProfile}; + private final Profile[] localProfileArray = new Profile[] {localProfile}; /** compute local routing information */ public FilterInfo getLocalFilterRouting(CacheEvent event) { @@ -1061,7 +1060,7 @@ public class FilterProfile implements DataSerializableFixedID { // bug #50809 - local routing for transactional ops must be done here // because the event isn't available later and we lose the old value for the entry final boolean processLocalProfile = - event.getOperation().isEntry() && ((EntryEventImpl) event).getTransactionId() != null; + event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null; fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo); } @@ -1106,7 +1105,7 @@ public class FilterProfile implements DataSerializableFixedID { } fillInCQRoutingInfo(event, true, NO_PROFILES, result); } - result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.EMPTY_SET); + result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.emptySet()); } return result; } @@ -1275,8 +1274,8 @@ public class FilterProfile implements DataSerializableFixedID { public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles, FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) { - Set clientsInv = Collections.EMPTY_SET; - Set clients = Collections.EMPTY_SET; + Set clientsInv = Collections.emptySet(); + Set clients = Collections.emptySet(); if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) { logger.trace(LogMarker.BRIDGE_SERVER, "finding interested clients for {}", event); @@ -1310,8 +1309,8 @@ public class FilterProfile implements DataSerializableFixedID { if (!pf.isLocalProfile() && cacheOpRecipients.contains(cf.getDistributedMember())) { if (frInfo == null) frInfo = new FilterRoutingInfo(); - frInfo.addInterestedClients(cf.getDistributedMember(), Collections.EMPTY_SET, - Collections.EMPTY_SET, false); + frInfo.addInterestedClients(cf.getDistributedMember(), Collections.emptySet(), + Collections.emptySet(), false); } continue; } @@ -1567,11 +1566,10 @@ public class FilterProfile implements DataSerializableFixedID { } private Set<Object> getAllKeyClients() { - Set allKeysRef = this.allKeyClients; if (testHook != null) { testHook.await(); } - return allKeysRef; + return (Set) this.allKeyClients; } public int getAllKeyClientsSize() { @@ -1627,7 +1625,7 @@ public class FilterProfile implements DataSerializableFixedID { return "clients[]"; } Set<Long> sorted = new TreeSet(wids.keySet()); - StringBuffer result = new StringBuffer(sorted.size() * 70); + StringBuilder result = new StringBuilder(sorted.size() * 70); result.append("clients["); Iterator<Long> it = sorted.iterator(); for (int i = 1; it.hasNext(); i++) { @@ -1652,7 +1650,7 @@ public class FilterProfile implements DataSerializableFixedID { return "cqs[]"; } Set<Long> sorted = new TreeSet(wids.keySet()); - StringBuffer result = new StringBuffer(sorted.size() * 70); + StringBuilder result = new StringBuilder(sorted.size() * 70); result.append("cqs["); Iterator<Long> it = sorted.iterator(); for (int i = 1; it.hasNext(); i++) { @@ -1666,7 +1664,6 @@ public class FilterProfile implements DataSerializableFixedID { return result.toString(); } - /** * given a collection of on-wire identifiers, this returns a set of the client/server identifiers * for each client or durable queue @@ -1730,7 +1727,7 @@ public class FilterProfile implements DataSerializableFixedID { return new LinkedList(this.filterProfileMsgQueue.get(member)); } } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /** @@ -1746,14 +1743,11 @@ public class FilterProfile implements DataSerializableFixedID { return new LinkedList(this.filterProfileMsgQueue.remove(member)); } } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /** * Adds the message to filter profile queue. - * - * @param member - * @param message */ public void addToFilterProfileQueue(InternalDistributedMember member, OperationMessage message) { if (logger.isDebugEnabled()) { @@ -1771,8 +1765,6 @@ public class FilterProfile implements DataSerializableFixedID { /** * Process the filter profile messages. - * - * @param msgs */ public void processQueuedFilterProfileMsgs(List msgs) { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -1840,7 +1832,7 @@ public class FilterProfile implements DataSerializableFixedID { "No cache profile to update, adding filter profile message to queue. Message :{}", this); } - FilterProfile localFP = ((PartitionedRegion) r).getFilterProfile(); + FilterProfile localFP = ((LocalRegion) r).getFilterProfile(); localFP.addToFilterProfileQueue(getSender(), this); dm.getCancelCriterion().checkCancelInProgress(null); } else { @@ -1865,7 +1857,7 @@ public class FilterProfile implements DataSerializableFixedID { reply.setRecipient(getSender()); try { dm.putOutgoing(reply); - } catch (CancelException e) { + } catch (CancelException ignore) { // can't send a reply, so ignore the exception } } @@ -1927,7 +1919,7 @@ public class FilterProfile implements DataSerializableFixedID { private CacheDistributionAdvisee findRegion() { CacheDistributionAdvisee result = null; - GemFireCacheImpl cache = null; + InternalCache cache; try { cache = GemFireCacheImpl.getInstance(); if (cache != null) { @@ -1936,7 +1928,7 @@ public class FilterProfile implements DataSerializableFixedID { result = (CacheDistributionAdvisee) lr; } } - } catch (CancelException e) { + } catch (CancelException ignore) { // nothing to do } return result; @@ -2023,7 +2015,7 @@ public class FilterProfile implements DataSerializableFixedID { if (nextID == Integer.MAX_VALUE) { this.hasLongID = true; } - result = Long.valueOf(nextID++); + result = nextID++; this.realIDs.put(realId, result); this.wireIDs.put(result, realId); } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java index 279a4d1..0dd24f6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java @@ -14,18 +14,6 @@ */ package org.apache.geode.internal.cache; -import org.apache.geode.DataSerializable; -import org.apache.geode.DataSerializer; -import org.apache.geode.InternalGemFireError; -import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.Version; -import org.apache.geode.internal.VersionedDataInputStream; -import org.apache.geode.internal.VersionedDataSerializable; - import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; @@ -38,6 +26,18 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.geode.DataSerializer; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.HeapDataOutputStream; +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.ObjToByteArraySerializer; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataSerializable; + /** * This class is used to hold the information about the servers and their Filters (CQs and Interest * List) that are satisfied by the cache update operation. @@ -217,7 +217,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable { /** DataSerializable methods */ public void fromData(DataInput in) throws IOException, ClassNotFoundException { DistributedMember myID = null; - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { myID = cache.getMyId(); } @@ -250,7 +250,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable { public void fromDataPre_GFE_7_1_0_0(DataInput in) throws IOException, ClassNotFoundException { DistributedMember myID = null; - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { myID = cache.getMyId(); } @@ -303,7 +303,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable { public boolean longIDs; - public static final long serialVersionUID = 0; + private static final long serialVersionUID = 0; /** Map holding Cq filterID and CqEvent Type */ private HashMap<Long, Integer> cqs; @@ -383,21 +383,20 @@ public class FilterRoutingInfo implements VersionedDataSerializable { Map.Entry e = (Map.Entry) it.next(); // most cq IDs and all event types are small ints, so we use an optimized // write that serializes 7 bits at a time in a compact form - InternalDataSerializer.writeUnsignedVL(((Long) e.getKey()).longValue(), hdos); - InternalDataSerializer.writeUnsignedVL(((Integer) e.getValue()).intValue(), hdos); + InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos); + InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos); } } InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos); InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos); if (out instanceof HeapDataOutputStream) { - ((HeapDataOutputStream) out).writeAsSerializedByteArray(hdos); + ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos); } else { byte[] myData = hdos.toByteArray(); DataSerializer.writeByteArray(myData, out); } } - public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException { if (OLD_MEMBERS_OPTIMIZED) { this.myDataVersion = InternalDataSerializer.getVersionForDataStreamOrNull(in); @@ -422,14 +421,14 @@ public class FilterRoutingInfo implements VersionedDataSerializable { Map.Entry e = (Map.Entry) it.next(); // most cq IDs and all event types are small ints, so we use an optimized // write that serializes 7 bits at a time in a compact form - InternalDataSerializer.writeUnsignedVL(((Long) e.getKey()).longValue(), hdos); - InternalDataSerializer.writeUnsignedVL(((Integer) e.getValue()).intValue(), hdos); + InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos); + InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos); } } InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos); InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos); if (out instanceof HeapDataOutputStream) { - ((HeapDataOutputStream) out).writeAsSerializedByteArray(hdos); + ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos); } else { byte[] myData = hdos.toByteArray(); DataSerializer.writeByteArray(myData, out); @@ -494,7 +493,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable { this.cqs = new HashMap(numEntries); for (int i = 0; i < numEntries; i++) { Long key = InternalDataSerializer.readUnsignedVL(dis); - Integer value = Integer.valueOf((int) InternalDataSerializer.readUnsignedVL(dis)); + Integer value = (int) InternalDataSerializer.readUnsignedVL(dis); this.cqs.put(key, value); } } @@ -506,10 +505,9 @@ public class FilterRoutingInfo implements VersionedDataSerializable { } } - @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); if (this.interestedClients != null && this.interestedClients.size() > 0) { sb.append("interestedClients:"); sb.append(this.interestedClients); @@ -533,4 +531,3 @@ public class FilterRoutingInfo implements VersionedDataSerializable { } } - http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java index 1145687..71423e3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; @@ -27,7 +26,6 @@ import java.util.Set; import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; -import org.apache.geode.cache.Cache; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; @@ -50,14 +48,11 @@ import org.apache.geode.internal.logging.LogService; public class FindDurableQueueProcessor extends ReplyProcessor21 { private static final Logger logger = LogService.getLogger(); - ////////// Public static entry point ///////// - final ArrayList durableLocations = new ArrayList(); - // @todo gregp: add javadocs public static ArrayList sendAndFind(ServerLocator locator, ClientProxyMembershipID proxyId, DM dm) { - Set members = ((ControllerAdvisor) locator.getDistributionAdvisor()).adviseBridgeServers(); + Set members = ((GridAdvisor) locator.getDistributionAdvisor()).adviseBridgeServers(); if (members.contains(dm.getId())) { // Don't send message to local server, see #50534. Set remoteMembers = new HashSet(members); @@ -80,9 +75,9 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { private static void findLocalDurableQueues(ClientProxyMembershipID proxyId, ArrayList<ServerLocation> matches) { - Cache c = GemFireCacheImpl.getInstance(); - if (c != null) { - List l = c.getCacheServers(); + InternalCache cache = GemFireCacheImpl.getInstance(); + if (cache != null) { + List l = cache.getCacheServers(); if (l != null) { Iterator i = l.iterator(); while (i.hasNext()) { @@ -96,9 +91,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { } } - - //////////// Instance methods ////////////// - @Override public void process(DistributionMessage msg) { // TODO Auto-generated method stub @@ -112,7 +104,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { super.process(msg); } - /** * Creates a new instance of FindDurableQueueProcessor */ @@ -120,9 +111,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { super(dm, members); } - - /////////////// Inner message classes ////////////////// - public static class FindDurableQueueMessage extends PooledDistributionMessage implements MessageWithReply { private int processorId; @@ -149,14 +137,12 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { return this.proxyId; } - @Override protected void process(final DistributionManager dm) { ArrayList<ServerLocation> matches = new ArrayList<ServerLocation>(); try { findLocalDurableQueues(proxyId, matches); - } finally { FindDurableQueueReply reply = new FindDurableQueueReply(); reply.setProcessorId(this.getProcessorId()); @@ -169,7 +155,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { dm.putOutgoing(reply); } } - } public int getDSFID() { @@ -205,7 +190,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { } } - public static class FindDurableQueueReply extends ReplyMessage { protected ArrayList matches = null; @@ -239,4 +223,3 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { } } } - http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java index 84e3009..3b89cfc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java @@ -50,9 +50,12 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage private static final Logger logger = LogService.getLogger(); private TXId txId; + private int processorId; - public FindRemoteTXMessage() {} + public FindRemoteTXMessage() { + // do nothing + } public FindRemoteTXMessage(TXId txid, int processorId, Set recipients) { super(); @@ -93,8 +96,7 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage logger.debug("processing {}", this); } FindRemoteTXMessageReply reply = new FindRemoteTXMessageReply(); - GemFireCacheImpl cache = GemFireCacheImpl.getInstance();// .getExisting("Looking up - // CacheTransactionManager"); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager(); mgr.waitForCompletingTransaction(txId); // in case there is a lost commit going on @@ -147,10 +149,8 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage @Override public String toString() { - StringBuffer buff = new StringBuffer(); + StringBuilder buff = new StringBuilder(); String className = getClass().getName(); - // className.substring(className.lastIndexOf('.', className.lastIndexOf('.') - 1) + 1); // - // partition.<foo> more generic version buff.append(className.substring( className.indexOf(PartitionMessage.PN_TOKEN) + PartitionMessage.PN_TOKEN.length())); // partition.<foo> buff.append("(txId=").append(this.txId).append("; sender=").append(getSender()) @@ -173,7 +173,6 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage this.processorId = in.readInt(); } - public static class FindRemoteTXMessageReplyProcessor extends ReplyProcessor21 { private InternalDistributedMember hostingMember; @@ -235,8 +234,6 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage return true; } - - /** * Reply message for {@link FindRemoteTXMessage}. Reply is a boolean to indicate if the recipient * hosts or has recently hosted the tx state. If the member did host the txState previously, reply http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java index 544a27e..199aafc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java @@ -12,9 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache; import java.io.DataInput; @@ -38,9 +35,6 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.logging.LogService; -/** - * - */ public class FindVersionTagOperation { private static final Logger logger = LogService.getLogger(); @@ -94,7 +88,6 @@ public class FindVersionTagOperation { public boolean stillWaiting() { return this.versionTag == null && super.stillWaiting(); } - } /** @@ -124,12 +117,6 @@ public class FindVersionTagOperation { /** for deserialization */ public FindVersionTagMessage() {} - /* - * (non-Javadoc) - * - * @see org.apache.geode.distributed.internal.DistributionMessage#process(org.apache.geode. - * distributed.internal.DistributionManager) - */ @Override protected void process(DistributionManager dm) { VersionTag result = null; @@ -169,7 +156,7 @@ public class FindVersionTagOperation { } private LocalRegion findRegion() { - GemFireCacheImpl cache = null; + InternalCache cache; try { cache = GemFireCacheImpl.getInstance(); if (cache != null) { http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 84e734c..4ed583a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -119,7 +119,6 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; -import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientRegionFactory; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.client.Pool; @@ -127,6 +126,8 @@ import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.ClientMetadataService; import org.apache.geode.cache.client.internal.ClientRegionFactoryImpl; +import org.apache.geode.cache.client.internal.ConnectionImpl; +import org.apache.geode.cache.client.internal.InternalClientCache; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.control.ResourceManager; import org.apache.geode.cache.execute.FunctionService; @@ -239,8 +240,8 @@ import org.apache.geode.redis.GeodeRedisServer; * GemFire's implementation of a distributed {@link Cache}. */ @SuppressWarnings("deprecation") -public class GemFireCacheImpl - implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime { +public class GemFireCacheImpl implements InternalCache, InternalClientCache, HasCachePerfStats, + DistributionAdvisee, CacheTime { private static final Logger logger = LogService.getLogger(); /** The default number of seconds to wait for a distributed lock */ @@ -287,6 +288,8 @@ public class GemFireCacheImpl * (the default) then the size of the entry value is unchanged by a delta application. Not a final * so that tests can change this value. * + * TODO: move or static or encapsulate with interface methods + * * @since GemFire h****** 6.1.2.9 */ static boolean DELTAS_RECALCULATE_SIZE = @@ -580,10 +583,6 @@ public class GemFireCacheImpl private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>(); - public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0; - - private static int clientFunctionTimeout; - private final SecurityService securityService = SecurityService.getSecurityService(); static { @@ -928,10 +927,12 @@ public class GemFireCacheImpl } // synchronized } + @Override public boolean isRESTServiceRunning() { return this.isRESTServiceRunning; } + @Override public void setRESTServiceRunning(boolean isRESTServiceRunning) { this.isRESTServiceRunning = isRESTServiceRunning; } @@ -941,6 +942,7 @@ public class GemFireCacheImpl * * @return RestAgent */ + @Override public RestAgent getRestAgent() { return this.restAgent; } @@ -1097,6 +1099,7 @@ public class GemFireCacheImpl * * @return true if the cache has pools declared */ + @Override public boolean hasPool() { return this.isClient || !getAllPools().isEmpty(); } @@ -1213,10 +1216,6 @@ public class GemFireCacheImpl startRestAgentServer(this); - int time = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT", - DEFAULT_CLIENT_FUNCTION_TIMEOUT); - clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT; - this.isInitialized = true; } @@ -1928,6 +1927,7 @@ public class GemFireCacheImpl close(false); } + @Override public void close(String reason, boolean keepAlive, boolean keepDS) { close(reason, null, keepAlive, keepDS); } @@ -1937,6 +1937,7 @@ public class GemFireCacheImpl close("Normal disconnect", null, keepAlive, false); } + @Override public void close(String reason, Throwable optionalCause) { close(reason, optionalCause, false, false); } @@ -1974,6 +1975,7 @@ public class GemFireCacheImpl * * @return the GatewaySender distributed lock service */ + @Override public DistributedLockService getGatewaySenderLockService() { if (this.gatewayLockService == null) { synchronized (this.gatewayLockServiceLock) { @@ -2492,22 +2494,25 @@ public class GemFireCacheImpl private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores = new ConcurrentHashMap<>(); - void addDiskStore(DiskStoreImpl dsi) { + @Override + public void addDiskStore(DiskStoreImpl dsi) { this.diskStores.put(dsi.getName(), dsi); if (!dsi.isOffline()) { this.diskMonitor.addDiskStore(dsi); } } - void removeDiskStore(DiskStoreImpl dsi) { - this.diskStores.remove(dsi.getName()); - this.regionOwnedDiskStores.remove(dsi.getName()); + @Override + public void removeDiskStore(DiskStoreImpl diskStore) { + this.diskStores.remove(diskStore.getName()); + this.regionOwnedDiskStores.remove(diskStore.getName()); // Added for M&M - if (!dsi.getOwnedByRegion()) - this.system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi); + if (!diskStore.getOwnedByRegion()) + this.system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore); } - void addRegionOwnedDiskStore(DiskStoreImpl dsi) { + @Override + public void addRegionOwnedDiskStore(DiskStoreImpl dsi) { this.regionOwnedDiskStores.put(dsi.getName(), dsi); if (!dsi.isOffline()) { this.diskMonitor.addDiskStore(dsi); @@ -2544,6 +2549,7 @@ public class GemFireCacheImpl return defaultDiskStoreName; } + // TODO: remove static from defaultDiskStoreName and move methods to InternalCache private static String defaultDiskStoreName = DiskStoreFactory.DEFAULT_DISK_STORE_NAME; @Override @@ -2579,13 +2585,13 @@ public class GemFireCacheImpl * @since GemFire prPersistSprint2 */ @Override - public Collection<DiskStoreImpl> listDiskStores() { + public Collection<DiskStore> listDiskStores() { return Collections.unmodifiableCollection(this.diskStores.values()); } @Override - public Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned() { - Collection<DiskStoreImpl> allDiskStores = new HashSet<>(); + public Collection<DiskStore> listDiskStoresIncludingRegionOwned() { + Collection<DiskStore> allDiskStores = new HashSet<>(); allDiskStores.addAll(this.diskStores.values()); allDiskStores.addAll(this.regionOwnedDiskStores.values()); return allDiskStores; @@ -2749,10 +2755,12 @@ public class GemFireCacheImpl return this.system.getSecurityInternalLogWriter(); } + @Override public InternalLogWriter getInternalLogWriter() { return this.system.getInternalLogWriter(); } + @Override public InternalLogWriter getSecurityInternalLogWriter() { return this.system.getSecurityInternalLogWriter(); } @@ -2762,7 +2770,8 @@ public class GemFireCacheImpl * * @return the sweeper task */ - EventTracker.ExpiryTask getEventTrackerTask() { + @Override + public EventTracker.ExpiryTask getEventTrackerTask() { return this.recordedEventSweeper; } @@ -2782,6 +2791,7 @@ public class GemFireCacheImpl * @param className Class name of the declarable * @return List of all instances of properties found for the given declarable */ + @Override public List<Properties> getDeclarableProperties(final String className) { List<Properties> propertiesList = new ArrayList<>(); synchronized (this.declarablePropertiesMap) { @@ -2800,6 +2810,7 @@ public class GemFireCacheImpl * @param declarable The declarable * @return Properties found for the given declarable */ + @Override public Properties getDeclarableProperties(final Declarable declarable) { return this.declarablePropertiesMap.get(declarable); } @@ -2998,11 +3009,12 @@ public class GemFireCacheImpl } } + // TODO: createVMRegion method is too complex for IDE to analyze @Override public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs, InternalRegionArguments internalRegionArgs) throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException { - // TODO: refactor overly complex method + if (getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { if (!internalRegionArgs.isUsedForMetaRegion() && internalRegionArgs.getInternalMetaRegion() == null) { @@ -3199,6 +3211,7 @@ public class GemFireCacheImpl return result; } + @Override public Set<LocalRegion> getApplicationRegions() { Set<LocalRegion> result = new HashSet<>(); synchronized (this.rootRegions) { @@ -3255,6 +3268,7 @@ public class GemFireCacheImpl return (LocalRegion) this.pathToRegion.get(path); } + @Override public LocalRegion getRegionByPathForProcessing(String path) { LocalRegion result = getRegionByPath(path); if (result == null) { @@ -3324,7 +3338,8 @@ public class GemFireCacheImpl } /** Return true if this region is initializing */ - boolean isGlobalRegionInitializing(String fullPath) { + @Override + public boolean isGlobalRegionInitializing(String fullPath) { this.stopper.checkCancelInProgress(null); int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // go through // initialization latches @@ -3636,13 +3651,13 @@ public class GemFireCacheImpl } @Override - public void addRegionListener(RegionListener l) { - this.regionListeners.add(l); + public void addRegionListener(RegionListener regionListener) { + this.regionListeners.add(regionListener); } @Override - public void removeRegionListener(RegionListener l) { - this.regionListeners.remove(l); + public void removeRegionListener(RegionListener regionListener) { + this.regionListeners.remove(regionListener); } @Override @@ -3763,6 +3778,7 @@ public class GemFireCacheImpl return addCacheServer(false); } + @Override public CacheServer addCacheServer(boolean isGatewayReceiver) { if (isClient()) { throw new UnsupportedOperationException("operation is not supported on a client cache"); @@ -3776,6 +3792,7 @@ public class GemFireCacheImpl return cacheServer; } + @Override public void addGatewaySender(GatewaySender sender) { if (isClient()) { throw new UnsupportedOperationException("operation is not supported on a client cache"); @@ -3824,6 +3841,7 @@ public class GemFireCacheImpl } } + @Override public void removeGatewaySender(GatewaySender sender) { if (isClient()) { throw new UnsupportedOperationException("operation is not supported on a client cache"); @@ -3844,6 +3862,7 @@ public class GemFireCacheImpl } } + @Override public void addGatewayReceiver(GatewayReceiver receiver) { if (isClient()) { throw new UnsupportedOperationException("operation is not supported on a client cache"); @@ -3859,6 +3878,7 @@ public class GemFireCacheImpl } } + @Override public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) { this.allAsyncEventQueues.add(asyncQueue); if (!asyncQueue.isMetaQueue()) { @@ -3923,6 +3943,7 @@ public class GemFireCacheImpl return null; } + @Override public void removeAsyncEventQueue(AsyncEventQueue asyncQueue) { if (isClient()) { throw new UnsupportedOperationException("operation is not supported on a client cache"); @@ -3975,6 +3996,7 @@ public class GemFireCacheImpl return cacheServersWithoutReceiver; } + @Override public List getCacheServersAndGatewayReceiver() { return this.allCacheServers; } @@ -3983,7 +4005,7 @@ public class GemFireCacheImpl * add a partitioned region to the set of tracked partitioned regions. This is used to notify the * regions when this cache requires, or does not require notification of all region/entry events. */ - void addPartitionedRegion(PartitionedRegion region) { + public void addPartitionedRegion(PartitionedRegion region) { synchronized (this.partitionedRegions) { if (region.isDestroyed()) { if (logger.isDebugEnabled()) { @@ -4121,7 +4143,8 @@ public class GemFireCacheImpl * * @see #addPartitionedRegion(PartitionedRegion) */ - void removePartitionedRegion(PartitionedRegion region) { + @Override + public void removePartitionedRegion(PartitionedRegion region) { synchronized (this.partitionedRegions) { if (this.partitionedRegions.remove(region)) { getCachePerfStats().incPartitionedRegions(-1); @@ -4383,6 +4406,7 @@ public class GemFireCacheImpl /** * Blocks until no register interests are in progress. */ + @Override public void waitForRegisterInterestsInProgress() { // In *this* particular context, let the caller know that // his cache has been cancelled. doWait below would do that as @@ -4409,11 +4433,13 @@ public class GemFireCacheImpl } } + @Override @SuppressWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") public void setQueryMonitorRequiredForResourceManager(boolean required) { queryMonitorRequiredForResourceManager = required; } + @Override public boolean isQueryMonitorDisabledForLowMemory() { return this.queryMonitorDisabledForLowMem; } @@ -4423,6 +4449,7 @@ public class GemFireCacheImpl * * @since GemFire 6.0 */ + @Override public QueryMonitor getQueryMonitor() { // Check to see if monitor is required if ResourceManager critical heap percentage is set // or whether we override it with the system variable; @@ -4932,7 +4959,8 @@ public class GemFireCacheImpl this.regionsInDestroy.remove(path, region); } - DistributedRegion getRegionInDestroy(String path) { + @Override + public DistributedRegion getRegionInDestroy(String path) { return this.regionsInDestroy.get(path); } @@ -5045,7 +5073,8 @@ public class GemFireCacheImpl return this.serialNumber; } - TXEntryStateFactory getTXEntryStateFactory() { + @Override + public TXEntryStateFactory getTXEntryStateFactory() { return this.txEntryStateFactory; } @@ -5067,6 +5096,7 @@ public class GemFireCacheImpl } // test hook + @Override public void setReadSerialized(boolean value) { this.cacheConfig.setPdxReadSerialized(value); } @@ -5140,6 +5170,7 @@ public class GemFireCacheImpl return PdxInstanceFactoryImpl.newCreator(className, true); } + @Override public PdxInstanceFactory createPdxInstanceFactory(String className, boolean expectDomainClass) { return PdxInstanceFactoryImpl.newCreator(className, expectDomainClass); } @@ -5170,7 +5201,8 @@ public class GemFireCacheImpl return this.getSystem().getOffHeapStore(); } - DiskStoreMonitor getDiskStoreMonitor() { + @Override + public DiskStoreMonitor getDiskStoreMonitor() { return this.diskMonitor; } @@ -5183,10 +5215,6 @@ public class GemFireCacheImpl return this.extensionPoint; } - public static int getClientFunctionTimeout() { - return clientFunctionTimeout; - } - @Override public CqService getCqService() { return this.cqService; http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java index a19a958..03c14ab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java @@ -12,23 +12,26 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; -import java.util.*; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; -import org.apache.geode.*; +import org.apache.geode.DataSerializer; import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.Locator; -import org.apache.geode.distributed.internal.*; -import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; -import org.apache.geode.distributed.internal.membership.*; - +import org.apache.geode.distributed.internal.DistributionAdvisee; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; /** * Used to share code with BridgeServerAdvisor and ControllerAdvisor - * */ public abstract class GridAdvisor extends DistributionAdvisor { @@ -38,9 +41,13 @@ public abstract class GridAdvisor extends DistributionAdvisor { } private final Object cacheLock = new Object(); + private volatile List/* <BridgeServerProfile> */ cachedBridgeServerProfiles; + private volatile List/* <ControllerProfile> */ cachedControllerProfiles; + private volatile Set/* <DistributedMember> */ cachedBridgeServerAdvise; + private volatile Set/* <DistributedMember> */ cachedControllerAdvise; private static final Filter CONTROLLER_FILTER = new Filter() { @@ -90,11 +97,12 @@ public abstract class GridAdvisor extends DistributionAdvisor { } /** - * Returns an unmodifiable <code>List</code> of the <code>BridgeServerProfile</code>s for all - * known bridge servers. + * Returns an unmodifiable {@code List} of the {@code BridgeServerProfile}s for all known bridge + * servers. */ public List/* <BridgeServerProfile> */ fetchBridgeServers() { - List/* <BridgeServerProfile> */ result = null; // this.cachedBridgeServerProfiles; + List/* <BridgeServerProfile> */ result = null; + // TODO: remove double-checking if (result == null) { synchronized (this.cacheLock) { // result = this.cachedBridgeServerProfiles; @@ -108,8 +116,8 @@ public abstract class GridAdvisor extends DistributionAdvisor { } /** - * Returns an unmodifiable <code>List</code> of the <code>ControllerProfile</code>s for all known - * cnx controllers. + * Returns an unmodifiable {@code List} of the {@code ControllerProfile}s for all known cnx + * controllers. */ public List/* <ControllerProfile> */ fetchControllers() { List/* <ControllerProfile> */ result = this.cachedControllerProfiles; @@ -224,8 +232,6 @@ public abstract class GridAdvisor extends DistributionAdvisor { profilesChanged(); } - - @Override public Set adviseProfileRemove() { // Our set of profiles includes local members. However, the update @@ -236,12 +242,10 @@ public abstract class GridAdvisor extends DistributionAdvisor { return results; } - - /** * Describes profile data common for all Grid resources */ - public static abstract class GridProfile extends DistributionAdvisor.Profile { + public abstract static class GridProfile extends DistributionAdvisor.Profile { private String host; @@ -323,7 +327,7 @@ public abstract class GridAdvisor extends DistributionAdvisor { */ protected final void tellLocalBridgeServers(boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) { - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + final InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null && !cache.isClosed()) { List<?> bridgeServers = cache.getCacheServersAndGatewayReceiver(); for (int i = 0; i < bridgeServers.size(); i++) { @@ -367,8 +371,8 @@ public abstract class GridAdvisor extends DistributionAdvisor { @Override public void fillInToString(StringBuilder sb) { super.fillInToString(sb); - sb.append("; host=" + this.host); - sb.append("; port=" + this.port); + sb.append("; host=").append(this.host); + sb.append("; port=").append(this.port); } } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java index 07dd62c..f80f971 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java @@ -89,16 +89,8 @@ public final class HARegion extends DistributedRegion { private volatile HARegionQueue owningQueue; - // private Map giiProviderStates; - - /** - * @param regionName - * @param attrs - * @param parentRegion - * @param cache - */ private HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, - GemFireCacheImpl cache) { + InternalCache cache) { super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) .setSnapshotInputStream(null).setImageTarget(null)); @@ -163,7 +155,7 @@ public final class HARegion extends DistributedRegion { ExpirationAttributes oldAttrs = getEntryTimeToLive(); this.entryTimeToLive = timeToLive.getTimeout(); this.entryTimeToLiveExpirationAction = timeToLive.getAction(); - setEntryTimeToLiveAtts(); + setEntryTimeToLiveAttributes(); updateEntryExpiryPossible(); timeToLiveChanged(oldAttrs); return oldAttrs; @@ -256,7 +248,7 @@ public final class HARegion extends DistributedRegion { * @throws IOException * @throws ClassNotFoundException */ - public static HARegion getInstance(String regionName, GemFireCacheImpl cache, HARegionQueue hrq, + public static HARegion getInstance(String regionName, InternalCache cache, HARegionQueue hrq, RegionAttributes ra) throws TimeoutException, RegionExistsException, IOException, ClassNotFoundException { @@ -441,9 +433,9 @@ public final class HARegion extends DistributedRegion { } @Override - public void fillInProfile(Profile p) { - super.fillInProfile(p); - HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile) p; + public void fillInProfile(Profile profile) { + super.fillInProfile(profile); + HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile) profile; // dunit tests create HARegions without encapsulating them in queues if (this.owningQueue != null) { h.isPrimary = this.owningQueue.isPrimary(); http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index d0ad5db..d6dc98f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -3451,7 +3451,7 @@ public class InitialImageOperation { if (haReg == null || haReg.getName() == null) { throw new ReplyException("HARegion for the proxy is Null."); } - GemFireCacheImpl cache = haReg.getCache(); + InternalCache cache = haReg.getCache(); CacheClientNotifier ccn = CacheClientNotifier.getInstance(); if (ccn == null || ccn.getHaContainer() == null) { http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index 33a7f52..5533ed1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.File; @@ -20,6 +19,8 @@ import java.io.IOException; import java.net.URL; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; @@ -27,26 +28,41 @@ import javax.transaction.TransactionManager; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.Declarable; +import org.apache.geode.cache.DiskStore; +import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionExistsException; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.client.internal.ClientMetadataService; import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.internal.QueryMonitor; import org.apache.geode.cache.query.internal.cq.CqService; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.wan.GatewayReceiver; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedLockService; -import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.CacheTime; import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionAdvisor; import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.ResourceAdvisor; import org.apache.geode.internal.cache.extension.Extensible; +import org.apache.geode.internal.cache.persistence.BackupManager; import org.apache.geode.internal.cache.persistence.PersistentMemberManager; import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.logging.InternalLogWriter; import org.apache.geode.internal.offheap.MemoryAllocator; +import org.apache.geode.management.internal.JmxManagerAdvisor; +import org.apache.geode.management.internal.RestAgent; +import org.apache.geode.pdx.PdxInstanceFactory; import org.apache.geode.pdx.internal.TypeRegistry; /** @@ -56,13 +72,13 @@ import org.apache.geode.pdx.internal.TypeRegistry; * @see org.apache.geode.cache.Cache * @since GemFire 7.0 */ -public interface InternalCache extends Cache, Extensible<Cache> { +public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { - DistributedMember getMyId(); + InternalDistributedMember getMyId(); - Collection<DiskStoreImpl> listDiskStores(); + Collection<DiskStore> listDiskStores(); - Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned(); + Collection<DiskStore> listDiskStoresIncludingRegionOwned(); CqService getCqService(); @@ -173,11 +189,122 @@ public interface InternalCache extends Cache, Extensible<Cache> { Set<PartitionedRegion> getPartitionedRegions(); - void addRegionListener(RegionListener l); + void addRegionListener(RegionListener regionListener); - void removeRegionListener(RegionListener l); + void removeRegionListener(RegionListener regionListener); Set<RegionListener> getRegionListeners(); CacheConfig getCacheConfig(); + + boolean getPdxReadSerializedByAnyGemFireServices(); + + BackupManager getBackupManager(); + + void setDeclarativeCacheConfig(CacheConfig cacheConfig); + + void initializePdxRegistry(); + + void readyDynamicRegionFactory(); + + void setBackupFiles(List<File> backups); + + void addDeclarableProperties(final Map<Declarable, Properties> mapOfNewDeclarableProps); + + void setInitializer(Declarable initializer, Properties initializerProps); + + boolean hasPool(); + + DiskStoreFactory createDiskStoreFactory(DiskStoreAttributes attrs); + + void determineDefaultPool(); + + <K, V> Region<K, V> basicCreateRegion(String name, RegionAttributes<K, V> attrs) + throws RegionExistsException, TimeoutException; + + BackupManager startBackup(InternalDistributedMember sender) throws IOException; + + Throwable getDisconnectCause(); + + void addPartitionedRegion(PartitionedRegion region); + + void removePartitionedRegion(PartitionedRegion region); + + void addDiskStore(DiskStoreImpl dsi); + + TXEntryStateFactory getTXEntryStateFactory(); + + EventTracker.ExpiryTask getEventTrackerTask(); + + void removeDiskStore(DiskStoreImpl diskStore); + + void addGatewaySender(GatewaySender sender); + + void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue); + + void removeAsyncEventQueue(AsyncEventQueue asyncQueue); + + QueryMonitor getQueryMonitor(); + + void close(String reason, Throwable systemFailureCause, boolean keepAlive, boolean keepDS); + + JmxManagerAdvisor getJmxManagerAdvisor(); + + List<Properties> getDeclarableProperties(final String className); + + int getUpTime(); + + Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions); + + Set<LocalRegion> getAllRegions(); + + DistributedRegion getRegionInDestroy(String path); + + void addRegionOwnedDiskStore(DiskStoreImpl dsi); + + DiskStoreMonitor getDiskStoreMonitor(); + + void close(String reason, Throwable optionalCause); + + LocalRegion getRegionByPathForProcessing(String path); + + List getCacheServersAndGatewayReceiver(); + + boolean isGlobalRegionInitializing(String fullPath); + + DistributionAdvisor getDistributionAdvisor(); + + void setQueryMonitorRequiredForResourceManager(boolean required); + + boolean isQueryMonitorDisabledForLowMemory(); + + boolean isRESTServiceRunning(); + + InternalLogWriter getInternalLogWriter(); + + InternalLogWriter getSecurityInternalLogWriter(); + + Set<LocalRegion> getApplicationRegions(); + + void removeGatewaySender(GatewaySender sender); + + DistributedLockService getGatewaySenderLockService(); + + RestAgent getRestAgent(); + + Properties getDeclarableProperties(final Declarable declarable); + + void setRESTServiceRunning(boolean isRESTServiceRunning); + + void close(String reason, boolean keepAlive, boolean keepDS); + + void addGatewayReceiver(GatewayReceiver receiver); + + CacheServer addCacheServer(boolean isGatewayReceiver); + + void setReadSerialized(boolean value); + + PdxInstanceFactory createPdxInstanceFactory(String className, boolean expectDomainClass); + + void waitForRegisterInterestsInProgress(); }
