http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java index 08dac6e..fe76863 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.IOException; @@ -47,7 +46,6 @@ import org.apache.geode.cache.Scope; import org.apache.geode.cache.partition.PartitionNotAvailableException; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.cache.util.CacheWriterAdapter; -import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DM; @@ -63,21 +61,12 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -/** - */ public class PartitionedRegionHelper { private static final Logger logger = LogService.getLogger(); - // ///////////// All the final variable ////////////////// /** 1 MB */ static final long BYTES_PER_MB = 1024 * 1024; - /** Name of allPartitionedRegions Region * */ - // static final String PARTITIONED_REGION_CONFIG_NAME = "__Config"; - - /** Prefix for the bucket2Node Region name defined in the global space. */ - // static final String BUCKET_2_NODE_TABLE_PREFIX = "_B2N_"; - /** * The administrative region used for storing Partitioned Region meta data sub regions * */ @@ -121,8 +110,6 @@ public class PartitionedRegionHelper { ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies); } - - /** * This function is used for cleaning the config meta data for the failed or closed * PartitionedRegion node. @@ -132,7 +119,7 @@ public class PartitionedRegionHelper { * @param cache GemFire cache. */ static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier, - GemFireCacheImpl cache) { + InternalCache cache) { removeGlobalMetadataForFailedNode(failedNode, regionIdentifier, cache, true); } @@ -146,13 +133,11 @@ public class PartitionedRegionHelper { * @param lock True if this removal should acquire and release the RegionLock */ static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier, - GemFireCacheImpl cache, final boolean lock) { + InternalCache cache, final boolean lock) { Region root = PartitionedRegionHelper.getPRRoot(cache, false); if (root == null) { return; // no partitioned region info to clean up } - // Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion( - // root, cache); PartitionRegionConfig prConfig = (PartitionRegionConfig) root.get(regionIdentifier); if (null == prConfig || !prConfig.containsNode(failedNode)) { return; @@ -163,9 +148,6 @@ public class PartitionedRegionHelper { try { if (lock) { rl.lock(); - // if (!rl.lock()) { - // return; - // } } prConfig = (PartitionRegionConfig) root.get(regionIdentifier); if (prConfig != null && prConfig.containsNode(failedNode)) { @@ -204,7 +186,7 @@ public class PartitionedRegionHelper { /** * Return a region that is the root for all Partitioned Region metadata on this node */ - public static LocalRegion getPRRoot(final Cache cache) { + public static LocalRegion getPRRoot(final InternalCache cache) { return getPRRoot(cache, true); } @@ -215,9 +197,8 @@ public class PartitionedRegionHelper { * * @return a GLOBLAL scoped root region used for PartitionedRegion administration */ - public static LocalRegion getPRRoot(final Cache cache, boolean createIfAbsent) { - GemFireCacheImpl gemCache = (GemFireCacheImpl) cache; - DistributedRegion root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true); + public static LocalRegion getPRRoot(final InternalCache cache, boolean createIfAbsent) { + DistributedRegion root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true); if (root == null) { if (!createIfAbsent) { return null; @@ -287,13 +268,13 @@ public class PartitionedRegionHelper { }; try { - root = (DistributedRegion) gemCache.createVMRegion(PR_ROOT_REGION_NAME, ra, + root = (DistributedRegion) cache.createVMRegion(PR_ROOT_REGION_NAME, ra, new InternalRegionArguments().setIsUsedForPartitionedRegionAdmin(true) .setInternalRegion(true).setCachePerfStatsHolder(prMetaStatsHolder)); root.getDistributionAdvisor().addMembershipListener(new MemberFailureListener()); - } catch (RegionExistsException silly) { + } catch (RegionExistsException ignore) { // we avoid this before hand, but yet we have to catch it - root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true); + root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true); } catch (IOException ieo) { Assert.assertTrue(false, "IOException creating Partitioned Region root: " + ieo); } catch (ClassNotFoundException cne) { @@ -326,7 +307,7 @@ public class PartitionedRegionHelper { */ public static void cleanUpMetaDataOnNodeFailure(DistributedMember failedMemId) { try { - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + final InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null || cache.getCancelCriterion().isCancelInProgress()) { return; } @@ -343,13 +324,13 @@ public class PartitionedRegionHelper { final ArrayList<String> ks = new ArrayList<String>(rootReg.keySet()); if (ks.size() > 1) { - Collections.shuffle(ks, PartitionedRegion.rand); + Collections.shuffle(ks, PartitionedRegion.RANDOM); } for (String prName : ks) { try { cleanUpMetaDataForRegion(cache, prName, failedMemId, null); - } catch (CancelException e) { + } catch (CancelException ignore) { // okay to ignore this - metadata will be cleaned up by cache close operation } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -357,12 +338,12 @@ public class PartitionedRegionHelper { } } } - } catch (CancelException e) { + } catch (CancelException ignore) { // ignore } } - public static void cleanUpMetaDataForRegion(final GemFireCacheImpl cache, final String prName, + public static void cleanUpMetaDataForRegion(final InternalCache cache, final String prName, final DistributedMember failedMemId, final Runnable postCleanupTask) { boolean runPostCleanUp = true; try { @@ -373,7 +354,7 @@ public class PartitionedRegionHelper { } try { prConf = (PartitionRegionConfig) rootReg.get(prName); - } catch (EntryDestroyedException ede) { + } catch (EntryDestroyedException ignore) { return; } if (prConf == null) { @@ -419,7 +400,7 @@ public class PartitionedRegionHelper { * This is a function for cleaning the config meta data (both the configuration data and the * buckets) for a Node that hosted a PartitionedRegion */ - private static void cleanPartitionedRegionMetaDataForNode(GemFireCacheImpl cache, Node node, + private static void cleanPartitionedRegionMetaDataForNode(InternalCache cache, Node node, PartitionRegionConfig prConf, String regionIdentifier) { if (logger.isDebugEnabled()) { logger.debug( @@ -691,7 +672,6 @@ public class PartitionedRegionHelper { /** * Find a ProxyBucketRegion by parsing the region fullPath * - * @param cache * @param fullPath full region path to parse * @param postInit true if caller should wait for bucket initialization to complete * @return ProxyBucketRegion as Bucket or null if not found @@ -780,15 +760,15 @@ public class PartitionedRegionHelper { public static String escapePRPath(String prFullPath) { String escaped = prFullPath.replace("_", "__"); - escaped = escaped.replace(LocalRegion.SEPARATOR_CHAR, '_'); + escaped = escaped.replace(Region.SEPARATOR_CHAR, '_'); return escaped; } - public static String TWO_SEPARATORS = LocalRegion.SEPARATOR + LocalRegion.SEPARATOR; + public static String TWO_SEPARATORS = Region.SEPARATOR + Region.SEPARATOR; public static String unescapePRPath(String escapedPath) { - String path = escapedPath.replace('_', LocalRegion.SEPARATOR_CHAR); + String path = escapedPath.replace('_', Region.SEPARATOR_CHAR); path = path.replace(TWO_SEPARATORS, "_"); return path; } @@ -841,33 +821,9 @@ public class PartitionedRegionHelper { } /** - * This method returns true if the member is found in the membership list of this member, else - * false. - * - * @param mem - * @param cache - * @return true if mem is found in membership list of this member. - */ - public static boolean isMemberAlive(DistributedMember mem, GemFireCacheImpl cache) { - return getMembershipSet(cache).contains(mem); - } - - /** - * Returns the current membership Set for this member. - * - * @param cache - * @return membership Set. - */ - public static Set getMembershipSet(GemFireCacheImpl cache) { - return cache.getInternalDistributedSystem().getDistributionManager() - .getDistributionManagerIds(); - } - - /** * Utility method to print warning when nodeList in b2n region is found empty. This will signify * potential data loss scenario. * - * @param partitionedRegion * @param bucketId Id of Bucket whose nodeList in b2n is empty. * @param callingMethod methodName of the calling method. */ @@ -887,7 +843,7 @@ public class PartitionedRegionHelper { Set members = partitionedRegion.getDistributionManager().getDistributionManagerIds(); logger.warn(LocalizedMessage.create( LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____SIZE_OF_NODELIST_AFTER_VERIFYBUCKETNODES_FOR_BUKID___1__IS_0, - new Object[] {callingMethod, Integer.valueOf(bucketId)})); + new Object[] {callingMethod, bucketId})); logger.warn(LocalizedMessage.create( LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____NODELIST_FROM_PRCONFIG___1, new Object[] {callingMethod, printCollection(prConfig.getNodes())})); @@ -899,12 +855,11 @@ public class PartitionedRegionHelper { /** * Utility method to print a collection. * - * @param c * @return String */ public static String printCollection(Collection c) { if (c != null) { - StringBuffer sb = new StringBuffer("["); + StringBuilder sb = new StringBuilder("["); Iterator itr = c.iterator(); while (itr.hasNext()) { sb.append(itr.next()); @@ -919,42 +874,6 @@ public class PartitionedRegionHelper { } } - /** - * Destroys and removes the distributed lock service. This is called from cache closure operation. - * - * @see PartitionedRegion#afterRegionsClosedByCacheClose(GemFireCacheImpl) - */ - static void destroyLockService() { - DistributedLockService dls = null; - synchronized (dlockMonitor) { - dls = DistributedLockService.getServiceNamed(PARTITION_LOCK_SERVICE_NAME); - } - if (dls != null) { - try { - DistributedLockService.destroy(PARTITION_LOCK_SERVICE_NAME); - } catch (IllegalArgumentException ex) { - // Our dlockService is already destroyed, - // probably by another thread - ignore - } - } - } - - public static boolean isBucketPrimary(Bucket buk) { - return buk.getBucketAdvisor().isPrimary(); - } - - public static boolean isRemotePrimaryAvailable(PartitionedRegion region, - FixedPartitionAttributesImpl fpa) { - List<FixedPartitionAttributesImpl> fpaList = region.getRegionAdvisor().adviseSameFPAs(fpa); - - for (FixedPartitionAttributes remotefpa : fpaList) { - if (remotefpa.isPrimary()) { - return true; - } - } - return false; - } - public static FixedPartitionAttributesImpl getFixedPartitionAttributesForBucket( PartitionedRegion pr, int bucketId) { List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl(); @@ -974,7 +893,7 @@ public class PartitionedRegionHelper { return fpa; } } - Object[] prms = new Object[] {pr.getName(), Integer.valueOf(bucketId)}; + Object[] prms = new Object[] {pr.getName(), bucketId}; throw new PartitionNotAvailableException( LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_IS_NOT_AVAILABLE_FOR_BUCKET_1_ON_ANY_DATASTORE .toLocalizedString(prms)); @@ -1027,42 +946,41 @@ public class PartitionedRegionHelper { List<InternalDistributedMember> remaining) {} } -} - -class FixedPartitionAttributesListener extends CacheListenerAdapter { - private static final Logger logger = LogService.getLogger(); + static class FixedPartitionAttributesListener extends CacheListenerAdapter { + private static final Logger logger = LogService.getLogger(); - public void afterCreate(EntryEvent event) { - PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue(); - if (!prConfig.getElderFPAs().isEmpty()) { - updatePartitionMap(prConfig); + public void afterCreate(EntryEvent event) { + PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue(); + if (!prConfig.getElderFPAs().isEmpty()) { + updatePartitionMap(prConfig); + } } - } - public void afterUpdate(EntryEvent event) { - PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue(); - if (!prConfig.getElderFPAs().isEmpty()) { - updatePartitionMap(prConfig); + public void afterUpdate(EntryEvent event) { + PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue(); + if (!prConfig.getElderFPAs().isEmpty()) { + updatePartitionMap(prConfig); + } } - } - private void updatePartitionMap(PartitionRegionConfig prConfig) { - int prId = prConfig.getPRId(); - PartitionedRegion pr = null; + private void updatePartitionMap(PartitionRegionConfig prConfig) { + int prId = prConfig.getPRId(); + PartitionedRegion pr = null; - try { - pr = PartitionedRegion.getPRFromId(prId); - if (pr != null) { - Map<String, Integer[]> partitionMap = pr.getPartitionsMap(); - for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) { - partitionMap.put(fxPrAttr.getPartitionName(), - new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()}); + try { + pr = PartitionedRegion.getPRFromId(prId); + if (pr != null) { + Map<String, Integer[]> partitionMap = pr.getPartitionsMap(); + for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) { + partitionMap.put(fxPrAttr.getPartitionName(), + new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()}); + } } + } catch (PRLocallyDestroyedException e) { + logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node", + prConfig.getPRId(), e); } - } catch (PRLocallyDestroyedException e) { - logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node", - prConfig.getPRId(), e); } } }
http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java index ef7cf03..00f50d2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java @@ -14,6 +14,19 @@ */ package org.apache.geode.internal.cache; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; import org.apache.geode.cache.CacheException; @@ -28,18 +41,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.pdx.internal.TypeRegistry; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; /** * Implementation of PoolFactory. @@ -217,7 +218,7 @@ public class PoolFactoryImpl implements PoolFactory { InetAddress hostAddr = InetAddress.getByName(host); InetSocketAddress sockAddr = new InetSocketAddress(hostAddr, port); l.add(sockAddr); - } catch (UnknownHostException cause) { + } catch (UnknownHostException ignore) { // IllegalArgumentException ex = new IllegalArgumentException("Unknown host " + host); // ex.initCause(cause); // throw ex; @@ -310,7 +311,7 @@ public class PoolFactoryImpl implements PoolFactory { * @since GemFire 5.7 */ public Pool create(String name) throws CacheException { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { TypeRegistry registry = cache.getPdxRegistry(); if (registry != null && !attributes.isGateway()) { http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java index 06378f2..9d4b5e2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java @@ -206,7 +206,7 @@ public final class ProxyBucketRegion implements Bucket { + getPartitionedRegion().getBucketName(this.bid); } - public GemFireCacheImpl getCache() { + public InternalCache getCache() { return this.partitionedRegion.getCache(); } http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java index 8ed07f8..01b7041 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java @@ -280,8 +280,8 @@ final class ProxyRegionMap implements RegionMap { List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo, ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, long tailKey) { - this.owner.txApplyInvalidatePart2(markerEntry, key, didDestroy, true, - false /* Clear conflic occured */); + this.owner.txApplyInvalidatePart2(markerEntry, key, didDestroy, true + /* Clear conflic occured */); if (this.owner.isInitialized()) { if (txEvent != null) { txEvent.addInvalidate(this.owner, markerEntry, key, newValue, aCallbackArgument); @@ -318,7 +318,7 @@ final class ProxyRegionMap implements RegionMap { long tailKey) { Operation putOp = p_putOp.getCorrespondingCreateOp(); long lastMod = owner.cacheTimeMillis(); - this.owner.txApplyPutPart2(markerEntry, key, newValue, lastMod, true, didDestroy, + this.owner.txApplyPutPart2(markerEntry, key, lastMod, true, didDestroy, false /* Clear conflict occured */); if (this.owner.isInitialized()) { if (txEvent != null) { @@ -582,12 +582,6 @@ final class ProxyRegionMap implements RegionMap { .toLocalizedString(DataPolicy.EMPTY)); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.RegionEntry#getSerializedValueOnDisk(org.apache.geode. - * internal.cache.LocalRegion) - */ public Object getSerializedValueOnDisk(LocalRegion localRegion) { throw new UnsupportedOperationException( LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0 http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java index a467726..96d871d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java @@ -14,25 +14,24 @@ */ package org.apache.geode.internal.cache; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.cache.lru.HeapEvictor; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.Callable; /** - * * Takes delta to be evicted and tries to evict the least no of LRU entry which would make * evictedBytes more than or equal to the delta * * @since GemFire 6.0 - * */ public class RegionEvictorTask implements Runnable { @@ -77,7 +76,7 @@ public class RegionEvictorTask implements Runnable { } } - private GemFireCacheImpl getGemFireCache() { + private InternalCache getInternalCache() { return getHeapEvictor().getGemFireCache(); } @@ -87,12 +86,12 @@ public class RegionEvictorTask implements Runnable { @Override public void run() { - getGemFireCache().getCachePerfStats().incEvictorJobsStarted(); + getInternalCache().getCachePerfStats().incEvictorJobsStarted(); long bytesEvicted = 0; long totalBytesEvicted = 0; try { while (true) { - getGemFireCache().getCachePerfStats(); + getInternalCache().getCachePerfStats(); final long start = CachePerfStats.getStatTime(); synchronized (this.regionSet) { if (this.regionSet.isEmpty()) { @@ -121,15 +120,15 @@ public class RegionEvictorTask implements Runnable { logger.warn(LocalizedMessage.create(LocalizedStrings.Eviction_EVICTOR_TASK_EXCEPTION, new Object[] {e.getMessage()}), e); } finally { - getGemFireCache().getCachePerfStats(); + getInternalCache().getCachePerfStats(); long end = CachePerfStats.getStatTime(); - getGemFireCache().getCachePerfStats().incEvictWorkTime(end - start); + getInternalCache().getCachePerfStats().incEvictWorkTime(end - start); } } } } } finally { - getGemFireCache().getCachePerfStats().incEvictorJobsCompleted(); + getInternalCache().getCachePerfStats().incEvictorJobsCompleted(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java index b6989f9..813f3c6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java @@ -14,33 +14,29 @@ */ package org.apache.geode.internal.cache; -import java.io.File; -import java.util.Properties; - -import org.apache.geode.CancelException; -import org.apache.geode.cache.*; -import org.apache.geode.cache.client.ClientNotReadyException; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; /** - * <code>RegionFactoryImpl</code> extends RegionFactory adding {@link RegionShortcut} support. + * {@code RegionFactoryImpl} extends RegionFactory adding {@link RegionShortcut} support. * * @since GemFire 6.5 */ - public class RegionFactoryImpl<K, V> extends RegionFactory<K, V> { - public RegionFactoryImpl(GemFireCacheImpl cache) { + public RegionFactoryImpl(InternalCache cache) { super(cache); } - public RegionFactoryImpl(GemFireCacheImpl cache, RegionShortcut pra) { + public RegionFactoryImpl(InternalCache cache, RegionShortcut pra) { super(cache, pra); } - public RegionFactoryImpl(GemFireCacheImpl cache, RegionAttributes ra) { + public RegionFactoryImpl(InternalCache cache, RegionAttributes ra) { super(cache, ra); } - public RegionFactoryImpl(GemFireCacheImpl cache, String regionAttributesId) { + public RegionFactoryImpl(InternalCache cache, String regionAttributesId) { super(cache, regionAttributesId); } http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java index 2c3fc95..765f707 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java @@ -43,8 +43,6 @@ import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; -import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.partitioned.PutMessage; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -77,6 +75,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage * The unique transaction Id on the sending member, used to construct a TXId on the receiving side */ private int txUniqId = TXManagerImpl.NOTX; + private InternalDistributedMember txMemberId = null; protected transient short flags; @@ -84,8 +83,9 @@ public abstract class RemoteOperationMessage extends DistributionMessage /* TODO [DISTTX] Convert into flag */ protected boolean isTransactionDistributed = false; - public RemoteOperationMessage() {} - + public RemoteOperationMessage() { + // do nothing + } public RemoteOperationMessage(InternalDistributedMember recipient, String regionPath, ReplyProcessor21 processor) { @@ -93,7 +93,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage setRecipient(recipient); this.regionPath = regionPath; this.processorId = processor == null ? 0 : processor.getProcessorId(); - if (processor != null && this.isSevereAlertCompatible()) { + if (processor != null && isSevereAlertCompatible()) { processor.enableSevereAlertProcessing(); } this.txUniqId = TXManagerImpl.getCurrentTXUniqueId(); @@ -108,7 +108,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage setRecipients(recipients); this.regionPath = regionPath; this.processorId = processor == null ? 0 : processor.getProcessorId(); - if (processor != null && this.isSevereAlertCompatible()) { + if (processor != null && isSevereAlertCompatible()) { processor.enableSevereAlertProcessing(); } this.txUniqId = TXManagerImpl.getCurrentTXUniqueId(); @@ -121,8 +121,6 @@ public abstract class RemoteOperationMessage extends DistributionMessage /** * Copy constructor that initializes the fields declared in this class - * - * @param other */ public RemoteOperationMessage(RemoteOperationMessage other) { this.regionPath = other.regionPath; @@ -152,7 +150,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage /** * @return the full path of the region */ - public final String getRegionPath() { + public String getRegionPath() { return regionPath; } @@ -161,30 +159,15 @@ public abstract class RemoteOperationMessage extends DistributionMessage * is required. */ @Override - public final int getProcessorId() { + public int getProcessorId() { return this.processorId; } /** - * @param processorId1 the {@link org.apache.geode.distributed.internal.ReplyProcessor21} id - * associated with the message, null if no acknowlegement is required. - */ - public final void registerProcessor(int processorId1) { - this.processorId = processorId1; - } - - public void setCacheOpRecipients(Collection cacheOpRecipients) { - // TODO need to implement this for other remote ops - assert this instanceof RemotePutMessage; - } - - - /** * check to see if the cache is closing */ public boolean checkCacheClosing(DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - // return (cache != null && cache.isClosed()); + InternalCache cache = GemFireCacheImpl.getInstance(); return cache == null || cache.isClosed(); } @@ -218,14 +201,14 @@ public abstract class RemoteOperationMessage extends DistributionMessage .toLocalizedString(dm.getId())); return; } - GemFireCacheImpl gfc = getCache(dm); - r = getRegionByPath(gfc); + InternalCache cache = getCache(dm); + r = getRegionByPath(cache); if (r == null && failIfRegionMissing()) { // if the distributed system is disconnecting, don't send a reply saying // the partitioned region can't be found (bug 36585) thr = new RegionDestroyedException( - LocalizedStrings.RemoteOperationMessage_0_COULD_NOT_FIND_REGION_1.toLocalizedString( - new Object[] {dm.getDistributionManagerId(), regionPath}), + LocalizedStrings.RemoteOperationMessage_0_COULD_NOT_FIND_REGION_1 + .toLocalizedString(dm.getDistributionManagerId(), regionPath), regionPath); return; // reply sent in finally block below } @@ -233,7 +216,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage thr = UNHANDLED_EXCEPTION; // [bruce] r might be null here, so we have to go to the cache instance to get the txmgr - TXManagerImpl txMgr = getTXManager(gfc); + TXManagerImpl txMgr = getTXManager(cache); TXStateProxy tx = txMgr.masqueradeAs(this); if (tx == null) { sendReply = operateOnRegion(dm, r, startTime); @@ -315,16 +298,16 @@ public abstract class RemoteOperationMessage extends DistributionMessage } } - TXManagerImpl getTXManager(GemFireCacheImpl cache) { + TXManagerImpl getTXManager(InternalCache cache) { return cache.getTxManager(); } - LocalRegion getRegionByPath(GemFireCacheImpl gfc) { - return gfc.getRegionByPathForProcessing(this.regionPath); + LocalRegion getRegionByPath(InternalCache internalCache) { + return internalCache.getRegionByPathForProcessing(this.regionPath); } - GemFireCacheImpl getCache(final DistributionManager dm) { - return (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache getCache(final DistributionManager dm) { + return (InternalCache) CacheFactory.getInstance(dm.getSystem()); } /** @@ -441,7 +424,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage } } - protected final InternalDistributedMember getTXMemberId() { + protected InternalDistributedMember getTXMemberId() { return txMemberId; } @@ -502,12 +485,11 @@ public abstract class RemoteOperationMessage extends DistributionMessage /** * @return the txUniqId */ - public final int getTXUniqId() { + public int getTXUniqId() { return txUniqId; } - - public final InternalDistributedMember getMemberToMasqueradeAs() { + public InternalDistributedMember getMemberToMasqueradeAs() { if (txMemberId == null) { return getSender(); } @@ -583,15 +565,15 @@ public abstract class RemoteOperationMessage extends DistributionMessage if (removeMember(id, true)) { this.prce = new ForceReattemptException( LocalizedStrings.PartitionMessage_PARTITIONRESPONSE_GOT_MEMBERDEPARTED_EVENT_FOR_0_CRASHED_1 - .toLocalizedString(new Object[] {id, Boolean.valueOf(crashed)})); + .toLocalizedString(id, crashed)); } checkIfDone(); } else { Exception e = new Exception( LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID.toLocalizedString()); logger.info(LocalizedMessage.create( - LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID_CRASHED_0, - Boolean.valueOf(crashed)), e); + LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID_CRASHED_0, crashed), + e); } } @@ -599,9 +581,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage * Waits for the response from the {@link RemoteOperationMessage}'s recipient * * @throws CacheException if the recipient threw a cache exception during message processing - * @throws PrimaryBucketException */ - final public void waitForCacheException() + public void waitForCacheException() throws CacheException, RemoteOperationException, PrimaryBucketException { try { waitForRepliesUninterruptibly(); @@ -630,8 +611,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage throw new PrimaryBucketException( LocalizedStrings.PartitionMessage_PEER_FAILED_PRIMARY_TEST.toLocalizedString(), t); } else if (t instanceof RegionDestroyedException) { - RegionDestroyedException rde = (RegionDestroyedException) t; - throw rde; + throw (RegionDestroyedException) t; } else if (t instanceof CancelException) { if (logger.isDebugEnabled()) { logger.debug( @@ -677,7 +657,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage * For Distributed Tx */ private void setIfTransactionDistributed() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { if (cache.getTxManager() != null) { this.isTransactionDistributed = cache.getTxManager().isDistributed(); http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java index 889c019..acf77ba 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java @@ -52,19 +52,13 @@ import org.apache.geode.internal.cache.versions.DiskVersionTag; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Unretained; -import org.apache.geode.internal.util.BlobHelper; -import org.apache.geode.internal.util.Breadcrumbs; import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE; import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE; -import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES; -import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT; -import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_OBJECT; /** * A Replicate Region update message. Meant to be sent only to the peer who hosts transactional @@ -479,11 +473,6 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl this.op = operation; } - @Override - public void setCacheOpRecipients(Collection cacheOpRecipients) { - this.cacheOpRecipients = cacheOpRecipients; - } - /** * sets the instance variable hasOldValue to the giving boolean value. */ http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java index 65cda5d..34f6b73 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java @@ -12,16 +12,56 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; -/* enumerate each imported class because conflict with dl.u.c.TimeoutException */ +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.NotSerializableException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import org.apache.logging.log4j.Logger; -import org.apache.geode.*; -import org.apache.geode.cache.*; +import org.apache.geode.CancelCriterion; +import org.apache.geode.CancelException; +import org.apache.geode.DataSerializer; +import org.apache.geode.GemFireException; +import org.apache.geode.InternalGemFireException; +import org.apache.geode.SystemFailure; +import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.CacheLoader; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.LoaderHelper; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.TimeoutException; import org.apache.geode.cache.util.ObjectSizer; import org.apache.geode.distributed.DistributedSystemDisconnectedException; -import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.PooledDistributionMessage; +import org.apache.geode.distributed.internal.ProcessorKeeper21; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.SerialDistributionMessage; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; @@ -31,19 +71,9 @@ import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.geode.internal.offheap.Releasable; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Retained; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.NotSerializableException; -import java.util.*; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - /** * Implementation for distributed search, load and write operations in the GemFire system. Provides @@ -54,18 +84,15 @@ import java.util.concurrent.locks.Lock; * times.netLoad happens as a one phase operation in all cases except where the scope is GLOBAL At * the receiving end, the request is converted into an appropriate message whose process method * responds to the request. - * */ - public class SearchLoadAndWriteProcessor implements MembershipListener { private static final Logger logger = LogService.getLogger(); public static final int SMALL_BLOB_SIZE = - Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000).intValue(); + Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000); static final long RETRY_TIME = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue(); - + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000); private volatile InternalDistributedMember selectedNode; private boolean selectedNodeDead = false; @@ -200,7 +227,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } finally { if (event != listenerEvent) { if (listenerEvent instanceof EntryEventImpl) { - ((EntryEventImpl) listenerEvent).release(); + ((Releasable) listenerEvent).release(); } } } @@ -334,7 +361,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { if (this.advisor != null) { this.advisor.removeMembershipListener(this); } - } catch (IllegalArgumentException e) { + } catch (IllegalArgumentException ignore) { } finally { getProcessorKeeper().remove(this.processorId); } @@ -343,13 +370,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { void remove() { getProcessorKeeper().remove(this.processorId); - } - - void initialize(LocalRegion theRegion, Object theKey, Object theCallbackArg) { - this.region = theRegion; this.regionName = theRegion.getFullPath(); this.key = theKey; @@ -358,10 +381,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { Scope scope = attrs.getScope(); if (scope.isDistributed()) { this.advisor = ((CacheDistributionAdvisee) this.region).getCacheDistributionAdvisor(); - this.distributionManager = ((CacheDistributionAdvisee) theRegion).getDistributionManager(); + this.distributionManager = theRegion.getDistributionManager(); this.timeout = getSearchTimeout(); this.advisor.addMembershipListener(this); - } } @@ -369,7 +391,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { this.key = key; } - /************** Protected Methods ********************/ protected void setSelectedNode(InternalDistributedMember selectedNode) { this.selectedNode = selectedNode; this.selectedNodeDead = false; @@ -383,18 +404,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { return this.key; } - /************** Package Methods **********************/ - InternalDistributedMember getSelectedNode() { return this.selectedNode; } - /************** Private Methods **********************/ /** * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region, * most of the services it provides are relevant to distribution only. The 3 services it provides * are netSearch, netLoad, netWrite - * */ private SearchLoadAndWriteProcessor() { resetResults(); @@ -410,7 +427,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { this.responseQueue = null; } - /** * If we have a local cache loader and the region is not global, then invoke the loader If the * region is local, or the result is non-null, then return whatever the loader returned do a @@ -614,7 +630,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { .toLocalizedString(key)); } break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; region.getCancelCriterion().checkCancelInProgress(null); // continue; @@ -871,7 +887,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } finally { if (event != pevent) { if (event instanceof EntryEventImpl) { - ((EntryEventImpl) event).release(); + ((Releasable) event).release(); } } } @@ -1005,7 +1021,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { this.remoteGetInProgress = true; setSelectedNode(sender); return; // sendValueRequest does the rest of the work - } catch (RejectedExecutionException ex) { + } catch (RejectedExecutionException ignore) { // just fall through since we must be shutting down. } } @@ -1195,7 +1211,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { if (waitTimeMs <= 0) { throw new TimeoutException( LocalizedStrings.SearchLoadAndWriteProcessor_TIMED_OUT_WHILE_DOING_NETSEARCHNETLOADNETWRITE_PROCESSORID_0_KEY_IS_1 - .toLocalizedString(new Object[] {Integer.valueOf(this.processorId), this.key})); + .toLocalizedString(new Object[] {this.processorId, this.key})); } boolean interrupted = Thread.interrupted(); @@ -1229,14 +1245,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { sb.append(" msRemaining=").append(waitTimeMs); } if (lastNS != 0) { - sb.append(" lastNotifySpot=" + lastNS); + sb.append(" lastNotifySpot=").append(lastNS); } throw new TimeoutException( LocalizedStrings.SearchLoadAndWriteProcessor_TIMEOUT_DURING_NETSEARCHNETLOADNETWRITE_DETAILS_0 .toLocalizedString(sb)); } return; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; region.getCancelCriterion().checkCancelInProgress(null); // keep waiting until we are done @@ -1305,14 +1321,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { DiskRegion dr = rgn.getDiskRegion(); if (dr != null) { dr.setClearCountReference(); - } ; + } } protected static void removeClearCountReference(LocalRegion rgn) { DiskRegion dr = rgn.getDiskRegion(); if (dr != null) { dr.removeClearCountReference(); - } ; + } } /** @@ -1326,12 +1342,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { nMsg.doGet((DistributionManager) this.distributionManager); } - /***************************************************************************** - * INNER CLASSES - *****************************************************************************/ - - - /** * A QueryMessage is broadcast to every node that has the region defined, to find out who has a * valid copy of the requested object. @@ -1368,7 +1378,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private static final short HAS_IDLE_TIME = (HAS_TTL << 1); private static final short ALWAYS_SEND_RESULT = (HAS_IDLE_TIME << 1); - public QueryMessage() {}; + public QueryMessage() { + // do nothing + } /** * Using a new or pooled message instance, create and send the query to all nodes. @@ -1492,8 +1504,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { try { // check to see if we would have to wait on initialization latch (if global) // if so abort and reply with null - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); - if (gfc.isGlobalRegionInitializing(this.regionName)) { + InternalCache cache = (InternalCache) CacheFactory.getInstance(dm.getSystem()); + if (cache.isGlobalRegionInitializing(this.regionName)) { replyWithNull(dm); if (logger.isDebugEnabled()) { logger.debug("Global Region not initialized yet"); @@ -1512,31 +1524,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { if (entry != null) { synchronized (entry) { assert region.isInitialized(); - { - if (dm.cacheTimeMillis() - startTime < timeoutMs) { - o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes, - // decrc - if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o) - && !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { - isPresent = true; - VersionStamp stamp = entry.getVersionStamp(); - if (stamp != null && stamp.hasValidVersion()) { - tag = stamp.asVersionTag(); - } - long lastModified = entry.getLastModified(); - lastModifiedCacheTime = lastModified; - isSer = o instanceof CachedDeserializable; - if (isSer) { - o = ((CachedDeserializable) o).getSerializedValue(); - } - if (isPresent && (this.alwaysSendResult - || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) { - sendResult = true; - } + if (dm.cacheTimeMillis() - startTime < timeoutMs) { + o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes, + // decrc + if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o) + && !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { + isPresent = true; + VersionStamp stamp = entry.getVersionStamp(); + if (stamp != null && stamp.hasValidVersion()) { + tag = stamp.asVersionTag(); + } + lastModifiedCacheTime = entry.getLastModified(); + isSer = o instanceof CachedDeserializable; + if (isSer) { + o = ((CachedDeserializable) o).getSerializedValue(); + } + if (isPresent && (this.alwaysSendResult + || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) { + sendResult = true; } - } else { - requestorTimedOut = true; } + } else { + requestorTimedOut = true; } } } else if (logger.isDebugEnabled()) { @@ -1549,10 +1558,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { ResponseMessage.sendMessage(this.key, this.getSender(), processorId, (sendResult ? o : null), lastModifiedCacheTime, isPresent, isSer, requestorTimedOut, dm, tag); - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException ignore) { logger.debug("Region Destroyed Exception in QueryMessage doGet, null"); replyWithNull(dm); - } catch (CancelException cce) { + } catch (CancelException ignore) { logger.debug("CacheClosedException in QueryMessage doGet, null"); replyWithNull(dm); } catch (VirtualMachineError err) { @@ -1577,14 +1586,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private void replyWithNull(DistributionManager dm) { ResponseMessage.sendMessage(this.key, this.getSender(), processorId, null, 0, false, false, false, dm, null); - } - } - /********************* ResponseMessage ***************************************/ - - /** * The ResponseMessage is a reply to a QueryMessage, and contains the object's value, if it is * below the byte limit, otherwise an indication of whether the sender has the value. @@ -1605,7 +1609,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** is the value present */ private boolean isPresent; - /** Is blob serialized? */ private boolean isSerialized; @@ -1865,11 +1868,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { // bytes, decrc if (eov != null) { if (eov == Token.INVALID || eov == Token.LOCAL_INVALID) { - // ebv = null; (redundant assignment) + // nothing? } else if (dm.cacheTimeMillis() - startTime < timeoutMs) { if (!region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { - long lastModified = entry.getLastModified(); - lastModifiedCacheTime = lastModified; + lastModifiedCacheTime = entry.getLastModified(); if (eov instanceof CachedDeserializable) { CachedDeserializable cd = (CachedDeserializable) eov; if (!cd.isSerialized()) { @@ -1911,10 +1913,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId, this.key, ebv, ebvObj, ebvLen, lastModifiedCacheTime, isSer, requestorTimedOut, authoritative, dm, versionTag); - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException ignore) { replyWithNull(dm); - } catch (CancelException cce) { + } catch (CancelException ignore) { replyWithNull(dm); } catch (VirtualMachineError err) { @@ -1940,13 +1942,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private void replyWithNull(DistributionManager dm) { NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId, this.key, null, null, 0, 0, false, false, false, dm, null); - } - } - /********************* NetSearchReplyMessage ***************************************/ - /** * The NetSearchReplyMessage is a reply to a NetSearchRequestMessage, and contains the object's * value. @@ -1961,8 +1959,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** The gemfire id of the SearchLoadAndWrite object waiting for response */ private int processorId; - - /** The object value being transferred */ private byte[] value; @@ -2150,7 +2146,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { try { processor.distributionManager.putOutgoingUserData(msg); - } catch (NotSerializableException e) { + } catch (NotSerializableException ignore) { throw new IllegalArgumentException( LocalizedStrings.SearchLoadAndWriteProcessor_MESSAGE_NOT_SERIALIZABLE .toLocalizedString()); @@ -2210,13 +2206,11 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { + "\" in region \"" + this.regionName + "\", processorId " + processorId; } - - private void doLoad(DistributionManager dm) { long startTime = dm.cacheTimeMillis(); int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem()); LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName); if (region != null && region.isInitialized() && (dm.cacheTimeMillis() - startTime < timeoutMs)) { @@ -2282,16 +2276,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { void replyWithException(Exception e, DistributionManager dm) { NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId, null, dm, this.aCallbackArgument, e, false, false); - } - - } - - - /********************* NetLoadReplyMessage ***************************************/ - /** * The NetLoadReplyMessage is a reply to a RequestMessage, and contains the object's value. */ @@ -2303,7 +2290,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** The object value being transferred */ private Object result; - /** Loader parameter returned to sender */ private Object aCallbackArgument; @@ -2481,7 +2467,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { long startTime = dm.cacheTimeMillis(); int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem()); LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName); if (region != null && region.isInitialized() && (dm.cacheTimeMillis() - startTime < timeoutMs)) { @@ -2560,7 +2546,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { true); } - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException ignore) { NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm, false, null, false); @@ -2594,16 +2580,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } finally { LocalRegion.setThreadInitLevelRequirement(oldLevel); } - - - } - - } - /********************* NetWriteReplyMessage *********************************/ - /** * The NetWriteReplyMessage is a reply to a NetWriteRequestMessage, and contains the success code * or exception that is propagated back to the requestor http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java index aa37880..7f28d5a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java @@ -56,7 +56,7 @@ public class ServerPingMessage extends PooledDistributionMessage { * * @return true if all the recipients are pingable */ - public static boolean send(GemFireCacheImpl cache, Set<InternalDistributedMember> recipients) { + public static boolean send(InternalCache cache, Set<InternalDistributedMember> recipients) { InternalDistributedSystem ids = cache.getInternalDistributedSystem(); DM dm = ids.getDistributionManager(); http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java index eb93b76..c745754 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; @@ -100,18 +99,16 @@ public class StateFlushOperation { private DM dm; - /** flush current ops to the given members for the given region */ public static void flushTo(Set<InternalDistributedMember> targets, DistributedRegion region) { DM dm = region.getDistributionManager(); - DistributedRegion r = region; - boolean initialized = r.isInitialized(); + boolean initialized = region.isInitialized(); if (initialized) { - r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so we can track - // current ops + // force a new "view" so we can track current ops + region.getDistributionAdvisor().forceNewMembershipVersion(); try { - r.getDistributionAdvisor().waitForCurrentOperations(); - } catch (RegionDestroyedException e) { + region.getDistributionAdvisor().waitForCurrentOperations(); + } catch (RegionDestroyedException ignore) { return; } } @@ -137,14 +134,14 @@ public class StateFlushOperation { processors.add(processor); } - if (r.getRegionMap().getARMLockTestHook() != null) { - r.getRegionMap().getARMLockTestHook().beforeStateFlushWait(); + if (region.getRegionMap().getARMLockTestHook() != null) { + region.getRegionMap().getARMLockTestHook().beforeStateFlushWait(); } for (ReplyProcessor21 processor : processors) { try { processor.waitForReplies(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); return; } @@ -319,7 +316,7 @@ public class StateFlushOperation { // 36175) int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem()); Region r = gfc.getRegionByPathForProcessing(this.regionPath); if (r instanceof DistributedRegion) { region = (DistributedRegion) r; @@ -336,9 +333,9 @@ public class StateFlushOperation { // 36175) int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); try { - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache cache = (InternalCache) CacheFactory.getInstance(dm.getSystem()); Set<DistributedRegion> result = new HashSet(); - for (LocalRegion r : gfc.getAllRegions()) { + for (LocalRegion r : cache.getAllRegions()) { // it's important not to check if the cache is closing, so access // the isDestroyed boolean directly if (r instanceof DistributedRegion && !r.isDestroyed) { @@ -400,7 +397,7 @@ public class StateFlushOperation { } try { r.getDistributionAdvisor().waitForCurrentOperations(); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // continue with the next region } } @@ -422,7 +419,7 @@ public class StateFlushOperation { } } } - } catch (CancelException cce) { + } catch (CancelException ignore) { // cache is closed - no distribution advisor available for the region so nothing to do but // send the stabilization message } catch (Exception e) { @@ -530,7 +527,7 @@ public class StateFlushOperation { return "unknown channelState content"; } else { Map csmap = (Map) state; - StringBuffer result = new StringBuffer(200); + StringBuilder result = new StringBuilder(200); for (Iterator it = csmap.entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); result.append(entry.getKey()).append('=').append(entry.getValue()); @@ -565,7 +562,7 @@ public class StateFlushOperation { try { dm.getMembershipManager().waitForMessageState(getSender(), channelState); break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -697,7 +694,7 @@ public class StateFlushOperation { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("StateStabilizedMessage "); sb.append(this.processorId); if (super.getSender() != null) {