http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index da80fa6..8b74f27 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -12,34 +12,141 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; +import static org.apache.geode.internal.lang.SystemUtils.*; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireException; import org.apache.geode.StatisticsFactory; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.AttributesMutator; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.CacheLoader; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.CacheStatistics; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.CustomExpiry; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.cache.EntryExistsException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.InterestPolicy; +import org.apache.geode.cache.InterestRegistrationEvent; +import org.apache.geode.cache.LoaderHelper; +import org.apache.geode.cache.LowMemoryException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionResolver; +import org.apache.geode.cache.PartitionedRegionDistributionException; +import org.apache.geode.cache.PartitionedRegionStorageException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionExistsException; +import org.apache.geode.cache.RegionMembershipListener; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.TransactionDataNotColocatedException; +import org.apache.geode.cache.TransactionDataRebalancedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; -import org.apache.geode.cache.client.internal.*; -import org.apache.geode.cache.execute.*; +import org.apache.geode.cache.client.internal.ClientMetadataService; +import org.apache.geode.cache.execute.EmptyRegionFunctionException; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.cache.partition.PartitionNotAvailableException; import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.persistence.PersistentID; -import org.apache.geode.cache.query.*; -import org.apache.geode.cache.query.internal.*; -import org.apache.geode.cache.query.internal.index.*; +import org.apache.geode.cache.query.FunctionDomainException; +import org.apache.geode.cache.query.Index; +import org.apache.geode.cache.query.IndexCreationException; +import org.apache.geode.cache.query.IndexExistsException; +import org.apache.geode.cache.query.IndexInvalidException; +import org.apache.geode.cache.query.IndexNameConflictException; +import org.apache.geode.cache.query.IndexType; +import org.apache.geode.cache.query.MultiIndexCreationException; +import org.apache.geode.cache.query.NameResolutionException; +import org.apache.geode.cache.query.QueryException; +import org.apache.geode.cache.query.QueryInvocationTargetException; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.TypeMismatchException; +import org.apache.geode.cache.query.internal.Bag; +import org.apache.geode.cache.query.internal.CompiledSelect; +import org.apache.geode.cache.query.internal.DefaultQuery; +import org.apache.geode.cache.query.internal.ExecutionContext; +import org.apache.geode.cache.query.internal.QCompiler; +import org.apache.geode.cache.query.internal.QueryExecutor; +import org.apache.geode.cache.query.internal.ResultsBag; +import org.apache.geode.cache.query.internal.ResultsCollectionWrapper; +import org.apache.geode.cache.query.internal.ResultsSet; +import org.apache.geode.cache.query.internal.index.AbstractIndex; +import org.apache.geode.cache.query.internal.index.IndexCreationData; +import org.apache.geode.cache.query.internal.index.IndexManager; +import org.apache.geode.cache.query.internal.index.IndexUtils; +import org.apache.geode.cache.query.internal.index.PartitionedIndex; import org.apache.geode.cache.query.internal.types.ObjectTypeImpl; import org.apache.geode.cache.query.types.ObjectType; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.LockServiceDestroyedException; -import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionAdvisee; +import org.apache.geode.distributed.internal.DistributionAdvisor; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ProfileListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.locks.DLockRemoteToken; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -58,27 +165,64 @@ import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; import org.apache.geode.internal.cache.control.MemoryEvent; import org.apache.geode.internal.cache.control.MemoryThresholds; -import org.apache.geode.internal.cache.execute.*; +import org.apache.geode.internal.cache.execute.AbstractExecution; +import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner; +import org.apache.geode.internal.cache.execute.FunctionRemoteContext; +import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException; +import org.apache.geode.internal.cache.execute.LocalResultCollector; +import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionExecutor; +import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSender; +import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWaiter; +import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; +import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.lru.HeapEvictor; import org.apache.geode.internal.cache.lru.LRUStatistics; -import org.apache.geode.internal.cache.partitioned.*; +import org.apache.geode.internal.cache.lru.Sizeable; +import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; +import org.apache.geode.internal.cache.partitioned.DestroyMessage; import org.apache.geode.internal.cache.partitioned.DestroyMessage.DestroyResponse; +import org.apache.geode.internal.cache.partitioned.DestroyRegionOnDataStoreMessage; +import org.apache.geode.internal.cache.partitioned.DumpAllPRConfigMessage; +import org.apache.geode.internal.cache.partitioned.DumpB2NRegion; import org.apache.geode.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse; +import org.apache.geode.internal.cache.partitioned.DumpBucketsMessage; +import org.apache.geode.internal.cache.partitioned.FetchBulkEntriesMessage; import org.apache.geode.internal.cache.partitioned.FetchBulkEntriesMessage.FetchBulkEntriesResponse; +import org.apache.geode.internal.cache.partitioned.FetchEntriesMessage; import org.apache.geode.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse; +import org.apache.geode.internal.cache.partitioned.FetchEntryMessage; import org.apache.geode.internal.cache.partitioned.FetchEntryMessage.FetchEntryResponse; +import org.apache.geode.internal.cache.partitioned.FetchKeysMessage; import org.apache.geode.internal.cache.partitioned.FetchKeysMessage.FetchKeysResponse; +import org.apache.geode.internal.cache.partitioned.GetMessage; import org.apache.geode.internal.cache.partitioned.GetMessage.GetResponse; +import org.apache.geode.internal.cache.partitioned.IdentityRequestMessage; import org.apache.geode.internal.cache.partitioned.IdentityRequestMessage.IdentityResponse; +import org.apache.geode.internal.cache.partitioned.IdentityUpdateMessage; import org.apache.geode.internal.cache.partitioned.IdentityUpdateMessage.IdentityUpdateResponse; +import org.apache.geode.internal.cache.partitioned.IndexCreationMsg; +import org.apache.geode.internal.cache.partitioned.InterestEventMessage; import org.apache.geode.internal.cache.partitioned.InterestEventMessage.InterestEventResponse; +import org.apache.geode.internal.cache.partitioned.InvalidateMessage; import org.apache.geode.internal.cache.partitioned.InvalidateMessage.InvalidateResponse; +import org.apache.geode.internal.cache.partitioned.PREntriesIterator; +import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; +import org.apache.geode.internal.cache.partitioned.PRSanityCheckMessage; +import org.apache.geode.internal.cache.partitioned.PRUpdateEntryVersionMessage; import org.apache.geode.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse; import org.apache.geode.internal.cache.partitioned.PartitionMessage.PartitionResponse; +import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserver; +import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder; +import org.apache.geode.internal.cache.partitioned.PutAllPRMessage; +import org.apache.geode.internal.cache.partitioned.PutMessage; import org.apache.geode.internal.cache.partitioned.PutMessage.PutResult; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile; +import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage; +import org.apache.geode.internal.cache.partitioned.RemoveIndexesMessage; +import org.apache.geode.internal.cache.partitioned.SizeMessage; import org.apache.geode.internal.cache.partitioned.SizeMessage.SizeResponse; import org.apache.geode.internal.cache.persistence.PRPersistentConfig; import org.apache.geode.internal.cache.tier.InterestType; @@ -107,30 +251,18 @@ import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.sequencelog.RegionLogger; import org.apache.geode.internal.util.TransformUtils; import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; /** * A Region whose total storage is split into chunks of data (partitions) which are copied up to a * configurable level (for high availability) and placed on multiple VMs for improved performance * and increased storage capacity. - * */ public class PartitionedRegion extends LocalRegion implements CacheDistributionAdvisee, QueryExecutor { - public static final Random rand = + public static final Random RANDOM = new Random(Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "PartitionedRegionRandomSeed", - NanoTimer.getTime()).longValue()); + NanoTimer.getTime())); private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger(); @@ -143,7 +275,7 @@ public class PartitionedRegion extends LocalRegion * getNetworkHopType byte indicating this was not the bucket owner and a message had to be sent to * a primary in the same server group */ - public static final int NETWORK_HOP_TO_SAME_GROUP = 1; + private static final int NETWORK_HOP_TO_SAME_GROUP = 1; /** * getNetworkHopType byte indicating this was not the bucket owner and a message had to be sent to @@ -151,12 +283,12 @@ public class PartitionedRegion extends LocalRegion */ public static final int NETWORK_HOP_TO_DIFFERENT_GROUP = 2; - private final DiskRegionStats diskRegionStats; + /** * Changes scope of replication to secondary bucket to SCOPE.DISTRIBUTED_NO_ACK */ - public static final boolean DISABLE_SECONDARY_BUCKET_ACK = + static final boolean DISABLE_SECONDARY_BUCKET_ACK = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disablePartitionedRegionBucketAck"); /** @@ -170,11 +302,11 @@ public class PartitionedRegion extends LocalRegion private static ThreadLocal threadRandom = new ThreadLocal() { @Override protected Object initialValue() { - int i = rand.nextInt(); + int i = RANDOM.nextInt(); if (i < 0) { i = -1 * i; } - return Integer.valueOf(i); + return i; } }; @@ -203,7 +335,7 @@ public class PartitionedRegion extends LocalRegion private boolean cleanPRRegistration = false; /** Time to wait for for acquiring distributed lock ownership */ - final static long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter.parseLong( + private static final long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter.parseLong( System.getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY), PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT); @@ -279,7 +411,7 @@ public class PartitionedRegion extends LocalRegion * * Concurrency: {@link #isLocallyDestroyed} is volatile */ - public Thread locallyDestroyingThread; + Thread locallyDestroyingThread; // TODO someone please add a javadoc for this private volatile boolean hasPartitionedIndex = false; @@ -319,8 +451,7 @@ public class PartitionedRegion extends LocalRegion private ScheduledExecutorService bucketSorter; - private ConcurrentMap<String, Integer[]> partitionsMap = - new ConcurrentHashMap<String, Integer[]>(); + private ConcurrentMap<String, Integer[]> partitionsMap = new ConcurrentHashMap<>(); public ConcurrentMap<String, Integer[]> getPartitionsMap() { return this.partitionsMap; @@ -337,34 +468,33 @@ public class PartitionedRegion extends LocalRegion * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 = NWHOP tp servers in * other server-grp */ - private final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() { + private static final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() { @Override protected Byte initialValue() { - return Byte.valueOf((byte) NETWORK_HOP_NONE); + return (byte) NETWORK_HOP_NONE; } }; public void clearNetworkHopData() { - this.networkHopType.remove(); + networkHopType.remove(); this.metadataVersion.remove(); } private void setNetworkHopType(Byte value) { - this.networkHopType.set(value); + networkHopType.set(value); } /** - * <p> * If the last operation in the current thread required a one-hop to another server who held the * primary bucket for the operation then this will return something other than NETWORK_HOP_NONE. - * </p> + * <p> * see NETWORK_HOP_NONE, NETWORK_HOP_TO_SAME_GROUP and NETWORK_HOP_TO_DIFFERENT_GROUP */ public byte getNetworkHopType() { - return this.networkHopType.get().byteValue(); + return networkHopType.get(); } - private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() { + private static final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() { @Override protected Byte initialValue() { return ClientMetadataService.INITIAL_VERSION; @@ -372,14 +502,13 @@ public class PartitionedRegion extends LocalRegion }; private void setMetadataVersion(Byte value) { - this.metadataVersion.set(value); + metadataVersion.set(value); } public byte getMetadataVersion() { - return this.metadataVersion.get().byteValue(); + return metadataVersion.get(); } - /** * Returns the LRUStatistics for this PR. This is needed to find the single instance of * LRUStatistics created early for a PR when it is recovered from disk. This fixes bug 41938 @@ -392,9 +521,6 @@ public class PartitionedRegion extends LocalRegion return result; } - - ////////////////// ConcurrentMap methods ////////////////// - @Override public boolean remove(Object key, Object value, Object callbackArg) { final long startTime = PartitionedRegionStats.startTime(); @@ -405,11 +531,6 @@ public class PartitionedRegion extends LocalRegion } } - - - ////////////////// End of ConcurrentMap methods ////////////////// - - public PartitionListener[] getPartitionListeners() { return this.partitionListeners; } @@ -471,11 +592,11 @@ public class PartitionedRegion extends LocalRegion public Object getRegion(Object key) throws PRLocallyDestroyedException { if (cleared) { - Cache c = GemFireCacheImpl.getInstance(); - if (c == null) { + Cache cache = GemFireCacheImpl.getInstance(); + if (cache == null) { throw new CacheClosedException(); } else { - c.getCancelCriterion().checkCancelInProgress(null); + cache.getCancelCriterion().checkCancelInProgress(null); } } Assert.assertTrue(key instanceof Integer); @@ -527,12 +648,11 @@ public class PartitionedRegion extends LocalRegion } Assert.assertTrue(key instanceof Integer); if (sendIdentityRequestMessage) - IdentityRequestMessage.setLatestId(((Integer) key).intValue()); + IdentityRequestMessage.setLatestId((Integer) key); if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) { - PartitionedRegionException pre = new PartitionedRegionException( + throw new PartitionedRegionException( LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0 .toLocalizedString(key)); - throw pre; } return super.put(key, value); } @@ -544,26 +664,24 @@ public class PartitionedRegion extends LocalRegion } public synchronized String dump() { - StringBuffer b = new StringBuffer("prIdToPR Map@"); - b.append(System.identityHashCode(prIdToPR)).append(":\n"); - Map.Entry me; - for (Iterator i = prIdToPR.entrySet().iterator(); i.hasNext();) { - me = (Map.Entry) i.next(); - b.append(me.getKey()).append("=>").append(me.getValue()); - if (i.hasNext()) { - b.append("\n"); + StringBuilder sb = new StringBuilder("prIdToPR Map@"); + sb.append(System.identityHashCode(prIdToPR)).append(':').append(getLineSeparator()); + Map.Entry mapEntry; + for (Iterator iterator = prIdToPR.entrySet().iterator(); iterator.hasNext();) { + mapEntry = (Map.Entry) iterator.next(); + sb.append(mapEntry.getKey()).append("=>").append(mapEntry.getValue()); + if (iterator.hasNext()) { + sb.append(getLineSeparator()); } } - return b.toString(); + return sb.toString(); } } private int partitionedRegionId = -3; - // final private Scope userScope; - /** Node description */ - final private Node node; + private final Node node; /** Helper Object for redundancy Management of PartitionedRegion */ private final PRHARedundancyProvider redundancyProvider; @@ -578,15 +696,7 @@ public class PartitionedRegion extends LocalRegion */ private final StoppableCountDownLatch initializationLatchAfterBucketIntialization; - /** - * Constructor for a PartitionedRegion. This has an accessor (Region API) functionality and - * contains a datastore for actual storage. An accessor can act as a local cache by having a local - * storage enabled. A PartitionedRegion can be created by a factory method of RegionFactory.java - * and also by invoking Cache.createRegion(). (Cache.xml etc to be added) - * - */ - - static public final String RETRY_TIMEOUT_PROPERTY = + public static final String RETRY_TIMEOUT_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout"; private final PartitionRegionConfigValidator validator; @@ -604,16 +714,22 @@ public class PartitionedRegion extends LocalRegion private AbstractGatewaySender parallelGatewaySender = null; - public PartitionedRegion(String regionname, RegionAttributes ra, LocalRegion parentRegion, - GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) { - super(regionname, ra, parentRegion, cache, internalRegionArgs); + /** + * Constructor for a PartitionedRegion. This has an accessor (Region API) functionality and + * contains a datastore for actual storage. An accessor can act as a local cache by having a local + * storage enabled. A PartitionedRegion can be created by a factory method of RegionFactory.java + * and also by invoking Cache.createRegion(). (Cache.xml etc to be added) + */ + public PartitionedRegion(String regionName, RegionAttributes regionAttributes, + LocalRegion parentRegion, InternalCache cache, InternalRegionArguments internalRegionArgs) { + super(regionName, regionAttributes, parentRegion, cache, internalRegionArgs); this.node = initializeNode(); this.prStats = new PartitionedRegionStats(cache.getDistributedSystem(), getFullPath()); this.regionIdentifier = getFullPath().replace('/', '#'); if (logger.isDebugEnabled()) { - logger.debug("Constructing Partitioned Region {}", regionname); + logger.debug("Constructing Partitioned Region {}", regionName); } // By adding this disconnect listener we ensure that the pridmap is cleaned @@ -622,40 +738,37 @@ public class PartitionedRegion extends LocalRegion // (which prevents pridmap cleanup). cache.getInternalDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener); - // this.userScope = ra.getScope(); - this.partitionAttributes = ra.getPartitionAttributes(); + this.partitionAttributes = regionAttributes.getPartitionAttributes(); this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory(); this.retryTimeout = Integer.getInteger(RETRY_TIMEOUT_PROPERTY, - PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION).intValue(); + PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION); this.totalNumberOfBuckets = this.partitionAttributes.getTotalNumBuckets(); this.prStats.incTotalNumBuckets(this.totalNumberOfBuckets); - this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); // Warning: potential early escape - // of instance - this.redundancyProvider = new PRHARedundancyProvider(this); // Warning: - // potential - // early escape - // instance + + // Warning: potential early escape of instance + this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); + // Warning: potential early escape of instance + this.redundancyProvider = new PRHARedundancyProvider(this); // localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled(); // This is to make sure that local-cache get and put works properly. // getScope is overridden to return the correct scope. // this.scope = Scope.LOCAL; - this.redundantCopies = ra.getPartitionAttributes().getRedundantCopies(); - this.prStats.setConfiguredRedundantCopies(ra.getPartitionAttributes().getRedundantCopies()); - this.prStats.setLocalMaxMemory(ra.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024); + this.redundantCopies = regionAttributes.getPartitionAttributes().getRedundantCopies(); + this.prStats.setConfiguredRedundantCopies( + regionAttributes.getPartitionAttributes().getRedundantCopies()); + this.prStats.setLocalMaxMemory( + regionAttributes.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024); // No redundancy required for writes - this.minimumWriteRedundancy = - Integer - .getInteger( - DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionWriteRedundancy", 0) - .intValue(); + this.minimumWriteRedundancy = Integer.getInteger( + DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionWriteRedundancy", 0); + // No redundancy required for reads - this.minimumReadRedundancy = Integer - .getInteger(DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionReadRedundancy", 0) - .intValue(); + this.minimumReadRedundancy = Integer.getInteger( + DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionReadRedundancy", 0); - this.haveCacheLoader = ra.getCacheLoader() != null; + this.haveCacheLoader = regionAttributes.getCacheLoader() != null; this.initializationLatchAfterBucketIntialization = new StoppableCountDownLatch(this.getCancelCriterion(), 1); @@ -680,7 +793,7 @@ public class PartitionedRegion extends LocalRegion } if (logger.isDebugEnabled()) { - logger.debug("Partitioned Region {} constructed {}", regionname, + logger.debug("Partitioned Region {} constructed {}", regionName, (this.haveCacheLoader ? "with a cache loader" : "")); } if (this.getEvictionAttributes() != null @@ -757,7 +870,7 @@ public class PartitionedRegion extends LocalRegion }); } - public final boolean isShadowPR() { + public boolean isShadowPR() { return isShadowPR; } @@ -768,7 +881,7 @@ public class PartitionedRegion extends LocalRegion public Set<String> getParallelGatewaySenderIds() { Set<String> regionGatewaySenderIds = this.getAllGatewaySenderIds(); if (regionGatewaySenderIds.isEmpty()) { - return Collections.EMPTY_SET; + return Collections.emptySet(); } Set<GatewaySender> cacheGatewaySenders = getCache().getAllGatewaySenders(); Set<String> parallelGatewaySenderIds = new HashSet<String>(); @@ -804,10 +917,9 @@ public class PartitionedRegion extends LocalRegion if (config.getTotalNumBuckets() != this.getTotalNumberOfBuckets()) { Object[] prms = new Object[] {this.getFullPath(), this.getTotalNumberOfBuckets(), config.getTotalNumBuckets()}; - IllegalStateException ise = new IllegalStateException( + throw new IllegalStateException( LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2 .toString(prms)); - throw ise; } // Make sure we don't change to be colocated with a different region // We also can't change from colocated to not colocated without writing @@ -820,10 +932,9 @@ public class PartitionedRegion extends LocalRegion .toLocalizedString(this.getFullPath()), null, dsi); dsi.handleDiskAccessException(dae); - IllegalStateException ise = new IllegalStateException( + throw new IllegalStateException( LocalizedStrings.PartitionedRegion_FOR_REGION_0_ColocatedWith_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2 .toString(prms)); - throw ise; } } else { @@ -865,8 +976,6 @@ public class PartitionedRegion extends LocalRegion createAndValidatePersistentConfig(); initializePartitionedRegion(); - /* set the total number of buckets */ - // setTotalNumOfBuckets(); // If localMaxMemory is set to 0, do not initialize Data Store. final boolean storesData = this.localMaxMemory > 0; if (storesData) { @@ -1020,7 +1129,7 @@ public class PartitionedRegion extends LocalRegion if (!allGatewaySenderIds.isEmpty()) { for (GatewaySender sender : cache.getAllGatewaySenders()) { if (sender.isParallel() && allGatewaySenderIds.contains(sender.getId())) { - /** + /* * get the ParallelGatewaySender to create the colocated partitioned region for this * region. */ @@ -1204,7 +1313,6 @@ public class PartitionedRegion extends LocalRegion } final RegionLock rl = getRegionLock(); try { - // if (!rl.lock()) { if (logger.isDebugEnabled()) { logger.debug("registerPartitionedRegion: obtaining lock"); } @@ -1223,8 +1331,7 @@ public class PartitionedRegion extends LocalRegion this.getAllGatewaySenderIds()); logger.info(LocalizedMessage.create( LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_BORN_WITH_PRID_1_IDENT_2, - new Object[] {getFullPath(), Integer.valueOf(this.partitionedRegionId), - getRegionIdentifier()})); + new Object[] {getFullPath(), this.partitionedRegionId, getRegionIdentifier()})); PRSanityCheckMessage.schedule(this); } else { @@ -1238,11 +1345,11 @@ public class PartitionedRegion extends LocalRegion this.partitionedRegionId = prConfig.getPRId(); logger.info(LocalizedMessage.create( LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_CREATED_WITH_PRID_1, - new Object[] {getFullPath(), Integer.valueOf(this.partitionedRegionId)})); + new Object[] {getFullPath(), this.partitionedRegionId})); } synchronized (prIdToPR) { - prIdToPR.put(Integer.valueOf(this.partitionedRegionId), this); // last + prIdToPR.put(this.partitionedRegionId, this); // last } prConfig.addNode(this.node); if (this.getFixedPartitionAttributesImpl() != null) { @@ -1281,15 +1388,14 @@ public class PartitionedRegion extends LocalRegion SystemFailure.checkFailure(); String registerErrMsg = LocalizedStrings.PartitionedRegion_AN_EXCEPTION_WAS_CAUGHT_WHILE_REGISTERING_PARTITIONEDREGION_0_DUMPPRID_1 - .toLocalizedString(new Object[] {getFullPath(), prIdToPR.dump()}); + .toLocalizedString(getFullPath(), prIdToPR.dump()); try { synchronized (prIdToPR) { - if (prIdToPR.containsKey(Integer.valueOf(this.partitionedRegionId))) { - prIdToPR.put(Integer.valueOf(this.partitionedRegionId), PRIdMap.FAILED_REGISTRATION, - false); + if (prIdToPR.containsKey(this.partitionedRegionId)) { + prIdToPR.put(this.partitionedRegionId, PRIdMap.FAILED_REGISTRATION, false); logger.info(LocalizedMessage.create( LocalizedStrings.PartitionedRegion_FAILED_REGISTRATION_PRID_0_NAMED_1, - new Object[] {Integer.valueOf(this.partitionedRegionId), this.getName()})); + new Object[] {this.partitionedRegionId, this.getName()})); } } } catch (VirtualMachineError err) { @@ -1297,7 +1403,7 @@ public class PartitionedRegion extends LocalRegion // If this ever returns, rethrow the error. We're poisoned // now, so don't let this thread continue. throw err; - } catch (Throwable ignore) { + } catch (Throwable e) { // Whenever you catch Error or Throwable, you must also // catch VirtualMachineError (see above). However, there is // _still_ a possibility that you are dealing with a cascading @@ -1305,8 +1411,7 @@ public class PartitionedRegion extends LocalRegion // is still usable: SystemFailure.checkFailure(); if (logger.isDebugEnabled()) { - logger.debug("Partitioned Region creation, could not clean up after caught exception", - ignore); + logger.debug("Partitioned Region creation, could not clean up after caught exception", e); } } throw new PartitionedRegionException(registerErrMsg, t); @@ -1318,7 +1423,7 @@ public class PartitionedRegion extends LocalRegion } } catch (Exception es) { if (logger.isDebugEnabled()) { - logger.warn(es.getMessage(), es); + logger.debug(es.getMessage(), es); } } } @@ -1373,7 +1478,7 @@ public class PartitionedRegion extends LocalRegion /** * Get the Partitioned Region identifier used for DLocks (Bucket and Region) */ - final public String getRegionIdentifier() { + public String getRegionIdentifier() { return this.regionIdentifier; } @@ -1384,8 +1489,6 @@ public class PartitionedRegion extends LocalRegion /** * Throw an exception if persistent data recovery from disk is not complete for this region. - * - * @throws PartitionOfflineException */ public void checkPROffline() throws PartitionOfflineException { if (getDataPolicy().withPersistence() && !recoveredFromDisk) { @@ -1398,7 +1501,7 @@ public class PartitionedRegion extends LocalRegion } } - public final void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) { + public void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) { final Set<Node> nodes = prConfig.getNodes(); final PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(this); RegionLock colocatedLock = null; @@ -1432,11 +1535,8 @@ public class PartitionedRegion extends LocalRegion } /** - * - * @param keyInfo * @param access true if caller wants last accessed time updated * @param allowTombstones - whether a tombstone can be returned - * @return TODO */ @Override protected Region.Entry<?, ?> nonTXGetEntry(KeyInfo keyInfo, boolean access, @@ -1463,7 +1563,7 @@ public class PartitionedRegion extends LocalRegion logger.trace("getEntryInBucket: " + "Key key={} ({}) from: {} bucketId={}", key, key.hashCode(), targetNode, bucketStringForLogs(bucketId)); } - Integer bucketIdInt = Integer.valueOf(bucketId); + Integer bucketIdInt = bucketId; EntrySnapshot ret = null; int count = 0; RetryTimeKeeper retryTime = null; @@ -1503,10 +1603,10 @@ public class PartitionedRegion extends LocalRegion return ret; } catch (PRLocallyDestroyedException pde) { if (logger.isDebugEnabled()) { - logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException "); + logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException", pde); } checkReadiness(); - } catch (EntryNotFoundException enfe) { + } catch (EntryNotFoundException ignore) { return null; } catch (ForceReattemptException prce) { prce.checkKey(key); @@ -1515,7 +1615,7 @@ public class PartitionedRegion extends LocalRegion } checkReadiness(); InternalDistributedMember lastNode = retryNode; - retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue()); + retryNode = getOrCreateNodeForBucketRead(bucketIdInt); if (lastNode.equals(retryNode)) { if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); @@ -1530,8 +1630,8 @@ public class PartitionedRegion extends LocalRegion logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode); } - getRegionAdvisor().notPrimary(bucketIdInt.intValue(), retryNode); - retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue()); + getRegionAdvisor().notPrimary(bucketIdInt, retryNode); + retryNode = getOrCreateNodeForBucketRead(bucketIdInt); } // It's possible this is a GemFire thread e.g. ServerConnection @@ -1553,11 +1653,10 @@ public class PartitionedRegion extends LocalRegion if (logger.isDebugEnabled()) { e = new PartitionedRegionDistributionException( LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS - .toLocalizedString(Integer.valueOf(count))); + .toLocalizedString(count)); } logger.warn(LocalizedMessage.create( - LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, - Integer.valueOf(count)), e); + LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, count), e); return null; } @@ -1581,7 +1680,6 @@ public class PartitionedRegion extends LocalRegion * @param allowTombstones whether tombstones should be returned * @throws EntryNotFoundException if the entry doesn't exist * @throws ForceReattemptException if the peer is no longer available - * @throws PrimaryBucketException * @return true if the passed key is contained remotely. */ public EntrySnapshot getEntryRemotely(InternalDistributedMember targetNode, Integer bucketId, @@ -1616,7 +1714,7 @@ public class PartitionedRegion extends LocalRegion * @throws UnsupportedOperationException OVERRIDES */ @Override - final public Region createSubregion(String subregionName, RegionAttributes regionAttributes) + public Region createSubregion(String subregionName, RegionAttributes regionAttributes) throws RegionExistsException, TimeoutException { throw new UnsupportedOperationException(); } @@ -1710,7 +1808,7 @@ public class PartitionedRegion extends LocalRegion for (;;) { try { return doExecuteQuery(query, parameters, buckets); - } catch (ForceReattemptException fre) { + } catch (ForceReattemptException ignore) { // fall through and loop } } @@ -1736,20 +1834,20 @@ public class PartitionedRegion extends LocalRegion while (remoteIter.hasNext()) { allBuckets.add((Integer) remoteIter.next()); } - } catch (NoSuchElementException stop) { + } catch (NoSuchElementException ignore) { } } else { // local buckets Iterator localIter = null; if (this.dataStore != null) { localIter = buckets.iterator(); } else { - localIter = Collections.EMPTY_SET.iterator(); + localIter = Collections.emptySet().iterator(); } try { while (localIter.hasNext()) { allBuckets.add((Integer) localIter.next()); } - } catch (NoSuchElementException stop) { + } catch (NoSuchElementException ignore) { } } @@ -1782,7 +1880,7 @@ public class PartitionedRegion extends LocalRegion try { results = prqe.queryBuckets(null); break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } catch (FunctionDomainException e) { throw e; @@ -1806,7 +1904,7 @@ public class PartitionedRegion extends LocalRegion // Drop Duplicates if this is a DISTINCT query boolean allowsDuplicates = results.getCollectionType().allowsDuplicates(); - // Asif: No need to apply the limit to the SelectResults. + // No need to apply the limit to the SelectResults. // We know that even if we do not apply the limit, // the results will satisfy the limit // as it has been evaluated in the iteration of List to @@ -1821,16 +1919,14 @@ public class PartitionedRegion extends LocalRegion if (selectExpr.getOrderByAttrs() != null) { // Set limit also, its not applied while building the final result set as order by is // involved. - // results = new ResultsCollectionWrapper(elementType, results.asSet(), - // query.getLimit(parameters)); } else if (allowsDuplicates) { results = new ResultsCollectionWrapper(elementType, results.asSet()); } if (selectExpr.isCount() && (results.isEmpty() || selectExpr.isDistinct())) { - SelectResults resultCount = new ResultsBag(getCachePerfStats());// Constructor with - // elementType not visible. + // Constructor with elementType not visible. + SelectResults resultCount = new ResultsBag(getCachePerfStats()); resultCount.setElementType(new ObjectTypeImpl(Integer.class)); - ((ResultsBag) resultCount).addAndGetOccurence(results.size()); + ((Bag) resultCount).addAndGetOccurence(results.size()); return resultCount; } } @@ -1874,11 +1970,6 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - // ///////////////////////////////////////////////////////////////////// - // ////////////// Operation Supported for this release - // ////////////////////////////// - // ///////////////////////////////////////////////////////////////////// - @Override boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) @@ -1895,7 +1986,7 @@ public class PartitionedRegion extends LocalRegion final Integer bucketId = event.getKeyInfo().getBucketId(); assert bucketId != KeyInfo.UNKNOWN_BUCKET; // check in bucket2Node region - InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId.intValue(), null); + InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId, null); // force all values to be serialized early to make size computation cheap // and to optimize distribution. if (logger.isDebugEnabled()) { @@ -1905,7 +1996,7 @@ public class PartitionedRegion extends LocalRegion if (targetNode == null) { try { bucketStorageAssigned = false; - targetNode = createBucket(bucketId.intValue(), event.getNewValSizeForPR(), null); + targetNode = createBucket(bucketId, event.getNewValSizeForPR(), null); } catch (PartitionedRegionStorageException e) { // try not to throw a PRSE if the cache is closing or this region was // destroyed during createBucket() (bug 36574) @@ -1947,18 +2038,7 @@ public class PartitionedRegion extends LocalRegion rde2.initCause(rde); throw rde2; } - } - // catch (CacheWriterException cwe) { - // throw cwe; - // } - // catch (TimeoutException te) { - // throw te; - // } - // catch (RuntimeException re) { - // throw re; - // } - finally { - // event.setPutAllOperation(putAllOp_save); // Gester: temporary fix + } finally { if (putAllOp_save == null) { // only for normal put if (ifNew) { @@ -1970,8 +2050,8 @@ public class PartitionedRegion extends LocalRegion } if (!result) { checkReadiness(); - if (!ifNew && !ifOld && !this.concurrencyChecksEnabled) { // may fail due to concurrency - // conflict + if (!ifNew && !ifOld && !this.concurrencyChecksEnabled) { + // may fail due to concurrency conflict // failed for unknown reason // throw new PartitionedRegionStorageException("unable to execute operation"); logger.warn( @@ -2000,16 +2080,10 @@ public class PartitionedRegion extends LocalRegion getSharedDataView().destroyExistingEntry(event, true, null); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.LocalRegion#checkIfAboveThreshold(org.apache.geode.internal. - * cache.EntryEventImpl) - */ @Override - public void checkIfAboveThreshold(EntryEventImpl evi) throws LowMemoryException { - getRegionAdvisor().checkIfBucketSick(evi.getKeyInfo().getBucketId(), evi.getKey()); + public void checkIfAboveThreshold(EntryEventImpl entryEvent) throws LowMemoryException { + getRegionAdvisor().checkIfBucketSick(entryEvent.getKeyInfo().getBucketId(), + entryEvent.getKey()); } public boolean isFixedPartitionedRegion() { @@ -2044,9 +2118,8 @@ public class PartitionedRegion extends LocalRegion return 0; } - @Override - public void postPutAllFireEvents(DistributedPutAllOperation putallOp, + public void postPutAllFireEvents(DistributedPutAllOperation putAllOp, VersionedObjectList successfulPuts) { /* * No op on pr, will happen in the buckets etc. @@ -2054,22 +2127,21 @@ public class PartitionedRegion extends LocalRegion } @Override - public void postRemoveAllFireEvents(DistributedRemoveAllOperation op, + public void postRemoveAllFireEvents(DistributedRemoveAllOperation removeAllOp, VersionedObjectList successfulOps) { /* * No op on pr, will happen in the buckets etc. */ } - /** * Create PutAllPRMsgs for each bucket, and send them. * - * @param putallO DistributedPutAllOperation object. + * @param putAllOp DistributedPutAllOperation object. * @param successfulPuts not used in PartitionedRegion. */ @Override - public long postPutAllSend(DistributedPutAllOperation putallO, + public long postPutAllSend(DistributedPutAllOperation putAllOp, VersionedObjectList successfulPuts) { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -2077,94 +2149,85 @@ public class PartitionedRegion extends LocalRegion throw new CacheClosedException("Cache is shutting down"); } - try { - final long startTime = PartitionedRegionStats.startTime(); - // build all the msgs by bucketid - HashMap prMsgMap = putallO.createPRMessages(); - PutAllPartialResult partialKeys = new PutAllPartialResult(putallO.putAllDataSize); - - // clear the successfulPuts list since we're actually doing the puts here - // and the basicPutAll work was just a way to build the DPAO object - Map<Object, VersionTag> keyToVersionMap = - new HashMap<Object, VersionTag>(successfulPuts.size()); - successfulPuts.clearVersions(); - Iterator itor = prMsgMap.entrySet().iterator(); - while (itor.hasNext()) { - Map.Entry mapEntry = (Map.Entry) itor.next(); - Integer bucketId = (Integer) mapEntry.getKey(); - PutAllPRMessage prMsg = (PutAllPRMessage) mapEntry.getValue(); - checkReadiness(); - long then = 0; - if (isDebugEnabled) { - then = System.currentTimeMillis(); + final long startTime = PartitionedRegionStats.startTime(); + // build all the msgs by bucketid + HashMap prMsgMap = putAllOp.createPRMessages(); + PutAllPartialResult partialKeys = new PutAllPartialResult(putAllOp.putAllDataSize); + + // clear the successfulPuts list since we're actually doing the puts here + // and the basicPutAll work was just a way to build the DPAO object + Map<Object, VersionTag> keyToVersionMap = + new HashMap<Object, VersionTag>(successfulPuts.size()); + successfulPuts.clearVersions(); + Iterator itor = prMsgMap.entrySet().iterator(); + while (itor.hasNext()) { + Map.Entry mapEntry = (Map.Entry) itor.next(); + Integer bucketId = (Integer) mapEntry.getKey(); + PutAllPRMessage prMsg = (PutAllPRMessage) mapEntry.getValue(); + checkReadiness(); + long then = 0; + if (isDebugEnabled) { + then = System.currentTimeMillis(); + } + try { + VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg); + if (versions.size() > 0) { + partialKeys.addKeysAndVersions(versions); + versions.saveVersions(keyToVersionMap); + } else if (!this.concurrencyChecksEnabled) { // no keys returned if not versioned + Set keys = prMsg.getKeys(); + partialKeys.addKeys(keys); } - try { - VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg); - if (versions.size() > 0) { - partialKeys.addKeysAndVersions(versions); - versions.saveVersions(keyToVersionMap); - } else if (!this.concurrencyChecksEnabled) { // no keys returned if not versioned - Set keys = prMsg.getKeys(); - partialKeys.addKeys(keys); - } - } catch (PutAllPartialResultException pre) { - // sendMsgByBucket applied partial keys - if (isDebugEnabled) { - logger.debug("PR.postPutAll encountered PutAllPartialResultException, ", pre); - } - partialKeys.consolidate(pre.getResult()); - } catch (Exception ex) { - // If failed at other exception - if (isDebugEnabled) { - logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ", ex); - } - @Released - EntryEventImpl firstEvent = prMsg.getFirstEvent(this); - try { - partialKeys.saveFailedKey(firstEvent.getKey(), ex); - } finally { - firstEvent.release(); - } + } catch (PutAllPartialResultException pre) { + // sendMsgByBucket applied partial keys + if (isDebugEnabled) { + logger.debug("PR.postPutAll encountered PutAllPartialResultException, ", pre); } + partialKeys.consolidate(pre.getResult()); + } catch (Exception ex) { + // If failed at other exception if (isDebugEnabled) { - long now = System.currentTimeMillis(); - if ((now - then) >= 10000) { - logger.debug("PR.sendMsgByBucket took " + (now - then) + " ms"); - } + logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ", ex); + } + @Released + EntryEventImpl firstEvent = prMsg.getFirstEvent(this); + try { + partialKeys.saveFailedKey(firstEvent.getKey(), ex); + } finally { + firstEvent.release(); } } - this.prStats.endPutAll(startTime); - if (!keyToVersionMap.isEmpty()) { - for (Iterator it = successfulPuts.getKeys().iterator(); it.hasNext();) { - successfulPuts.addVersion(keyToVersionMap.get(it.next())); + if (isDebugEnabled) { + long now = System.currentTimeMillis(); + if ((now - then) >= 10000) { + logger.debug("PR.sendMsgByBucket took " + (now - then) + " ms"); } - keyToVersionMap.clear(); } + } + this.prStats.endPutAll(startTime); + if (!keyToVersionMap.isEmpty()) { + for (Iterator it = successfulPuts.getKeys().iterator(); it.hasNext();) { + successfulPuts.addVersion(keyToVersionMap.get(it.next())); + } + keyToVersionMap.clear(); + } - if (partialKeys.hasFailure()) { - logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1, - new Object[] {getFullPath(), partialKeys})); - if (putallO.isBridgeOperation()) { - if (partialKeys.getFailure() instanceof CancelException) { - throw (CancelException) partialKeys.getFailure(); - } else { - throw new PutAllPartialResultException(partialKeys); - } + if (partialKeys.hasFailure()) { + logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1, + new Object[] {getFullPath(), partialKeys})); + if (putAllOp.isBridgeOperation()) { + if (partialKeys.getFailure() instanceof CancelException) { + throw (CancelException) partialKeys.getFailure(); } else { - if (partialKeys.getFailure() instanceof RuntimeException) { - throw (RuntimeException) partialKeys.getFailure(); - } else { - throw new RuntimeException(partialKeys.getFailure()); - } + throw new PutAllPartialResultException(partialKeys); + } + } else { + if (partialKeys.getFailure() instanceof RuntimeException) { + throw (RuntimeException) partialKeys.getFailure(); + } else { + throw new RuntimeException(partialKeys.getFailure()); } } - } finally { - /* - * // TODO XD OFFHEAP MERGE: do we have any events that need freeOffHeapReferences for - * (PutAllPRMessage.PutAllResponse resp : responses) { PutAllPRMessage.PRMsgResponseContext - * ctx = resp.getContextObject(); if (ctx != null) { EntryEventImpl e = ctx.getEvent(); if (e - * != null) { e.release(); } } } - */ } return -1; } @@ -2272,7 +2335,7 @@ public class PartitionedRegion extends LocalRegion EntryEventImpl event = prMsg.getFirstEvent(this); try { RetryTimeKeeper retryTime = null; - InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null); + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); if (isDebugEnabled) { logger.debug("PR.sendMsgByBucket:bucket {}'s currentTarget is {}", bucketId, currentTarget); } @@ -2304,7 +2367,7 @@ public class PartitionedRegion extends LocalRegion boolean interrupted = Thread.interrupted(); try { Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -2343,9 +2406,9 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); if (isDebugEnabled) { - logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying", lastTarget, + logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget, currentTarget); } if (lastTarget.equals(currentTarget)) { @@ -2369,11 +2432,11 @@ public class PartitionedRegion extends LocalRegion logger.debug("Bucket {} on Node {} not primnary", notPrimary.getLocalizedMessage(), currentTarget); } - getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget); + getRegionAdvisor().notPrimary(bucketId, currentTarget); if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); } catch (DataLocationException dle) { if (isDebugEnabled) { logger.debug("DataLocationException processing putAll", dle); @@ -2413,7 +2476,7 @@ public class PartitionedRegion extends LocalRegion EntryEventImpl event = prMsg.getFirstEvent(this); try { RetryTimeKeeper retryTime = null; - InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null); + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); if (logger.isDebugEnabled()) { logger.debug("PR.sendMsgByBucket:bucket {}'s currentTarget is {}", bucketId, currentTarget); } @@ -2445,7 +2508,7 @@ public class PartitionedRegion extends LocalRegion boolean interrupted = Thread.interrupted(); try { Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -2484,7 +2547,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); if (logger.isTraceEnabled()) { logger.trace("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget, currentTarget); @@ -2510,11 +2573,11 @@ public class PartitionedRegion extends LocalRegion logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), currentTarget); } - getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget); + getRegionAdvisor().notPrimary(bucketId, currentTarget); if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); } catch (DataLocationException dle) { if (logger.isDebugEnabled()) { logger.debug("DataLocationException processing putAll", dle); @@ -2658,8 +2721,7 @@ public class PartitionedRegion extends LocalRegion boolean requireOldValue, final long lastModified) { if (logger.isDebugEnabled()) { logger.debug("putInBucket: {} ({}) to {} to bucketId={} retry={} ms", event.getKey(), - event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId.intValue()), - retryTimeout); + event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId), retryTimeout); } // retry the put remotely until it finds the right node managing the bucket @@ -2692,7 +2754,7 @@ public class PartitionedRegion extends LocalRegion boolean interrupted = Thread.interrupted(); try { Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -2726,11 +2788,6 @@ public class PartitionedRegion extends LocalRegion } checkIfAboveThreshold(event); if (isLocal) { - // final boolean cacheWrite = !event.isOriginRemote() - // && !event.isNetSearch(); - // if (cacheWrite) { - // doCacheWriteBeforePut(event, ifNew); - // } event.setInvokePRCallbacks(true); long start = this.prStats.startPutLocal(); try { @@ -2740,7 +2797,7 @@ public class PartitionedRegion extends LocalRegion // given that most manipulation of values is remote (requiring serialization to send). // But... function execution always implies local manipulation of // values so keeping locally updated values in Object form should be more efficient. - if (!DistributionManager.isFunctionExecutionThread.get().booleanValue()) { + if (!DistributionManager.isFunctionExecutionThread.get()) { // TODO: this condition may not help since BucketRegion.virtualPut calls // forceSerialized br.forceSerialized(event); @@ -2811,7 +2868,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); if (lastTarget.equals(currentTarget)) { if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, "update an entry", this.retryTimeout); @@ -2825,11 +2882,11 @@ public class PartitionedRegion extends LocalRegion logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), currentTarget); } - getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget); + getRegionAdvisor().notPrimary(bucketId, currentTarget); if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime); + currentTarget = getNodeForBucketWrite(bucketId, retryTime); } // It's possible this is a GemFire thread e.g. ServerConnection @@ -2857,7 +2914,7 @@ public class PartitionedRegion extends LocalRegion if (logger.isDebugEnabled()) { logger.debug( "putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}", - bucketStringForLogs(bucketId.intValue()), count, (timeOut - System.currentTimeMillis()), + bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()), currentTarget); } } // for @@ -2885,45 +2942,16 @@ public class PartitionedRegion extends LocalRegion } retryTime.waitForBucketsRecovery(); - newNode = getNodeForBucketWrite(bucketId.intValue(), retryTime); + newNode = getNodeForBucketWrite(bucketId, retryTime); if (newNode == null) { - newNode = createBucket(bucketId.intValue(), getEntrySize(event), retryTime); + newNode = createBucket(bucketId, getEntrySize(event), retryTime); } return newNode; } /** - * Serialize the key and value early (prior to creating the message) to gather the size of the - * entry Assumes the new value from the <code>EntryEventImpl</code> is not serialized - * - * @return sum of bytes as reported by {@link CachedDeserializable#getSizeInBytes()} - */ - // private int serializeValue(EntryEventImpl event) - // { - // TODO serialize the key as well - // this code used to make the following call: - // Object val = event.getNewValue(); - // which deserializes the value and we don't want to do that. - // int numBytes = 0; - // Object val = event.getNewValue(); - // if (val == null) { - // // event.setSerializedNewValue(new byte[] {DataSerializer.NULL}); - // return 0; - // } - // if (val instanceof byte[]) { - // byte[] v = (byte[]) val; - // numBytes = v.length; - // } else { - // if (event.getSerializedNewValue() == null) { - // event.setSerializedNewValue(EntryEventImpl.serialize(event.getNewValue())); - // } - // numBytes = getEntrySize(event); - // } - // return numBytes; - // } - /** - * Get the serialized size of an <code>EntryEventImpl</code> + * Get the serialized size of an {@code EntryEventImpl} * * @param eei the entry from whcih to fetch the size * @return the size of the serialized entry @@ -2932,28 +2960,11 @@ public class PartitionedRegion extends LocalRegion @Unretained final Object v = eei.getRawNewValue(); if (v instanceof CachedDeserializable) { - return ((CachedDeserializable) v).getSizeInBytes(); + return ((Sizeable) v).getSizeInBytes(); } return 0; } - // /** - // * Gets the Node that is managing a specific bucketId. This does consider - // the - // * failed nodes. - // * - // * @param bucketId - // * identifier for bucket - // * @param failedNodeList - // * of all the failedNodes to avoid these failed nodes to be picked in - // * the next node selection. - // * @return the Node managing the bucket - // */ - // private Node getNodeForBucketExcludeFailedNode(final Long bucketId, - // final List failedNodeList) { - // throw new IllegalStateException("bucket2node should not be used"); - // } - public InternalDistributedMember getOrCreateNodeForBucketWrite(int bucketId, final RetryTimeKeeper snoozer) { InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId, snoozer); @@ -3015,8 +3026,7 @@ public class PartitionedRegion extends LocalRegion final TimeoutException noTime = new TimeoutException( LocalizedStrings.PartitionedRegion_ATTEMPT_TO_ACQUIRE_PRIMARY_NODE_FOR_WRITE_ON_BUCKET_0_TIMED_OUT_IN_1_MS_CURRENT_REDUNDANCY_2_DOES_NOT_SATISFY_MINIMUM_3 .toLocalizedString(new Object[] {bucketStringForLogs(bucketId), - Integer.valueOf(localSnoozer.getRetryTime()), Integer.valueOf(red), - Integer.valueOf(this.minimumWriteRedundancy)})); + localSnoozer.getRetryTime(), red, this.minimumWriteRedundancy})); checkReadiness(); throw noTime; } @@ -3028,8 +3038,6 @@ public class PartitionedRegion extends LocalRegion return waitForNoStorageOrPrimary(bucketId, "write"); } - - /** * wait until there is a primary or there is no storage * @@ -3134,8 +3142,7 @@ public class PartitionedRegion extends LocalRegion if (isTX()) { return getNodeForBucketWrite(bucketId, null); } - InternalDistributedMember result = getRegionAdvisor().getPreferredNode(bucketId); - return result; + return getRegionAdvisor().getPreferredNode(bucketId); } /** @@ -3307,9 +3314,6 @@ public class PartitionedRegion extends LocalRegion * can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel * across all the nodes. * - * @param function - * @param execution - * @param rc * @since GemFire 6.0 */ public ResultCollector executeFunction(final Function function, @@ -3363,9 +3367,6 @@ public class PartitionedRegion extends LocalRegion /** * Executes function on multiple nodes - * - * @param function - * @param execution */ private ResultCollector executeOnMultipleNodes(final Function function, final PartitionedRegionFunctionExecutor execution, ResultCollector rc, boolean isPRSingleHop, @@ -3412,8 +3413,7 @@ public class PartitionedRegion extends LocalRegion boolean hasRemovedNode = false; while (iterator.hasNext()) { - if (execution.getFailedNodes() - .contains(((InternalDistributedMember) iterator.next()).getId())) { + if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) { hasRemovedNode = true; } } @@ -3482,7 +3482,7 @@ public class PartitionedRegion extends LocalRegion .constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this, localBucketSet), localBucketSet, resultSender, execution.isReExecute()); if (logger.isDebugEnabled()) { - logger.debug("FunctionService: Executing on local node with keys.{}" + localKeys); + logger.debug("FunctionService: Executing on local node with keys.{}", localKeys); } execution.executeFunctionOnLocalPRNode(function, prContext, resultSender, dm, isTX()); } @@ -3500,8 +3500,8 @@ public class PartitionedRegion extends LocalRegion recipMap.put(recip, context); } if (logger.isDebugEnabled()) { - logger.debug("FunctionService: Executing on remote nodes with member to keys map.{}" - + memberToKeysMap); + logger.debug("FunctionService: Executing on remote nodes with member to keys map.{}", + memberToKeysMap); } PartitionedRegionFunctionResultWaiter resultReciever = new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(), @@ -3509,14 +3509,11 @@ public class PartitionedRegion extends LocalRegion return resultReciever.getPartitionedDataFrom(recipMap, this, execution); } return localResultCollector; - } /** * Single key execution on single node * - * @param function - * @param execution * @since GemFire 6.0 */ private ResultCollector executeOnSingleNode(final Function function, @@ -3526,14 +3523,14 @@ public class PartitionedRegion extends LocalRegion final Object key = routingKeys.iterator().next(); final Integer bucketId; if (isBucketSetAsFilter) { - bucketId = ((Integer) key).intValue(); + bucketId = (Integer) key; } else { - bucketId = Integer.valueOf( - PartitionedRegionHelper.getHashKey(this, Operation.FUNCTION_EXECUTION, key, null, null)); + bucketId = + PartitionedRegionHelper.getHashKey(this, Operation.FUNCTION_EXECUTION, key, null, null); } InternalDistributedMember targetNode = null; if (function.optimizeForWrite()) { - targetNode = createBucket(bucketId.intValue(), 0, null /* retryTimeKeeper */); + targetNode = createBucket(bucketId, 0, null /* retryTimeKeeper */); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (hmm.isMemberHeapCritical(targetNode) @@ -3541,11 +3538,11 @@ public class PartitionedRegion extends LocalRegion Set<DistributedMember> sm = Collections.singleton((DistributedMember) targetNode); throw new LowMemoryException( LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1 - .toLocalizedString(new Object[] {function.getId(), sm}), + .toLocalizedString(function.getId(), sm), sm); } } else { - targetNode = getOrCreateNodeForBucketRead(bucketId.intValue()); + targetNode = getOrCreateNodeForBucketRead(bucketId); } final DistributedMember localVm = getMyId(); if (targetNode != null && isPRSingleHop && !localVm.equals(targetNode)) { @@ -3580,7 +3577,7 @@ public class PartitionedRegion extends LocalRegion * if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, * "doing function execution", this.retryTimeout); // NOTREACHED } */ - // Asif: Fix for Bug # 40083 + // Fix for Bug # 40083 targetNode = null; while (targetNode == null) { if (retryTime.overMaximum()) { @@ -3590,9 +3587,9 @@ public class PartitionedRegion extends LocalRegion } retryTime.waitToRetryNode(); if (function.optimizeForWrite()) { - targetNode = getOrCreateNodeForBucketWrite(bucketId.intValue(), retryTime); + targetNode = getOrCreateNodeForBucketWrite(bucketId, retryTime); } else { - targetNode = getOrCreateNodeForBucketRead(bucketId.intValue()); + targetNode = getOrCreateNodeForBucketRead(bucketId); } } if (targetNode == null) { @@ -3636,7 +3633,8 @@ public class PartitionedRegion extends LocalRegion Set<Integer> actualBucketSet = this.getRegionAdvisor().getBucketSet(); try { bucketSet.retainAll(actualBucketSet); - } catch (NoSuchElementException done) { + } catch (NoSuchElementException ignore) { + // done } HashMap<InternalDistributedMember, HashSet<Integer>> memberToBuckets = FunctionExecutionNodePruner.groupByMemberToBuckets(this, bucketSet, @@ -3692,8 +3690,7 @@ public class PartitionedRegion extends LocalRegion boolean hasRemovedNode = false; while (iterator.hasNext()) { - if (execution.getFailedNodes() - .contains(((InternalDistributedMember) iterator.next()).getId())) { + if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) { hasRemovedNode = true; } } @@ -3720,7 +3717,7 @@ public class PartitionedRegion extends LocalRegion Set<DistributedMember> sm = SetUtils.intersection(hcm, dest); throw new LowMemoryException( LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1 - .toLocalizedString(new Object[] {function.getId(), sm}), + .toLocalizedString(function.getId(), sm), sm); } @@ -3738,8 +3735,6 @@ public class PartitionedRegion extends LocalRegion execution.isReExecute(), execution.isFnSerializationReqd()); recipMap.put(recip, context); } - // final LocalResultCollector localResultCollector = new LocalResultCollector(function, rc, - // execution); final LocalResultCollector<?, ?> localRC = execution.getLocalResultCollector(function, rc); final DM dm = getDistributionManager(); @@ -3755,28 +3750,18 @@ public class PartitionedRegion extends LocalRegion execution.getArgumentsForMember(getMyId().getId()), null, ColocationHelper .constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this, localBucketSet), localBucketSet, resultSender, execution.isReExecute()); - // final RegionFunctionContextImpl prContext = new RegionFunctionContextImpl( - // function.getId(), PartitionedRegion.this, execution - // .getArgumentsForMember(getMyId().getId()), null, ColocationHelper - // .constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this, - // localBucketSet), resultSender, execution.isReExecute()); execution.executeFunctionOnLocalNode(function, prContext, resultSender, dm, isTX()); } PartitionedRegionFunctionResultWaiter resultReciever = new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(), localRC, function, resultSender); - ResultCollector reply = resultReciever.getPartitionedDataFrom(recipMap, this, execution); - - return reply; - + return resultReciever.getPartitionedDataFrom(recipMap, this, execution); } /** * Executes function on all bucket nodes * - * @param function - * @param execution * @return ResultCollector * @since GemFire 6.0 */ @@ -3788,7 +3773,7 @@ public class PartitionedRegion extends LocalRegion while (itr.hasNext()) { try { bucketSet.add(itr.next()); - } catch (NoSuchElementException ex) { + } catch (NoSuchElementException ignore) { } } HashMap<InternalDistributedMember, HashSet<Integer>> memberToBuckets = @@ -3810,8 +3795,7 @@ public class PartitionedRegion extends LocalRegion boolean hasRemovedNode = false; while (iterator.hasNext()) { - if (execution.getFailedNodes() - .contains(((InternalDistributedMember) iterator.next()).getId())) { + if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) { hasRemovedNode = true; } } @@ -3868,18 +3852,11 @@ public class PartitionedRegion extends LocalRegion new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(), localResultCollector, function, resultSender); - ResultCollector reply = resultReciever.getPartitionedDataFrom(recipMap, this, execution); - - return reply; + return resultReciever.getPartitionedDataFrom(recipMap, this, execution); } /** - * no docs - * - * @param preferCD * @param requestingClient the client requesting the object, or null if not from a client - * @param clientEvent TODO - * @param returnTombstones TODO * @param allowRetry if false then do not retry */ private Object getFromBucket(final InternalDistributedMember targetNode, int bucketId, @@ -3929,13 +3906,10 @@ public class PartitionedRegion extends LocalRegion } } - // Test hook - if (((LocalRegion) this).isTest()) - ((LocalRegion) this).incCountNotFoundInLocal(); obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones); - // TODO:Suranjan&Yogesh : there should be better way than this one + // TODO: there should be better way than this one String name = Thread.currentThread().getName(); if (name.startsWith("ServerConnection") && !getMyId().equals(retryNode)) { setNetworkHopType(bucketId, (InternalDistributedMember) retryNode); @@ -3988,7 +3962,8 @@ public class PartitionedRegion extends LocalRegion if (prce instanceof BucketNotFoundException) { TransactionException ex = new TransactionDataRebalancedException( LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING - .toLocalizedString(key)); + .toLocalizedString(key), + prce); ex.initCause(prce); throw ex; } @@ -4007,8 +3982,7 @@ public class PartitionedRegion extends LocalRegion // Make transaction fail so client could retry // instead of returning null if ForceReattemptException is thrown. // Should not see it currently, added to be protected against future changes. - TransactionException ex = new TransactionException("Failed to get key: " + key, prce); - throw ex; + throw new TransactionException("Failed to get key: " + key, prce); } } } catch (PrimaryBucketException notPrimary) { @@ -4046,17 +4020,15 @@ public class PartitionedRegion extends LocalRegion if (logger.isDebugEnabled()) { e = new PartitionedRegionDistributionException( LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS - .toLocalizedString(Integer.valueOf(count))); + .toLocalizedString(count)); } - logger.warn(LocalizedMessage.create( - LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS, - Integer.valueOf(count)), e); + logger.warn(LocalizedMessage + .create(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS, count), e); return null; } /** * If a bucket is local, try to fetch the value from it - * */ public Object getFromLocalBucket(int bucketId, final Object key, final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, @@ -4079,14 +4051,11 @@ public class PartitionedRegion extends LocalRegion return null; } - /** * This invokes a cache writer before a destroy operation. Although it has the same method * signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion * invokes this method via its "entries" member, while PartitionedRegion invokes this method in * its region operation methods and messages. - * - * @see LocalRegion#cacheWriteBeforeRegionDestroy(RegionEventImpl) */ @Override boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event) @@ -4125,8 +4094,7 @@ public class PartitionedRegion extends LocalRegion */ public DistributedMember getMemberOwning(Object key) { int bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null); - InternalDistributedMember targetNode = getNodeForBucketRead(bucketId); - return targetNode; + return getNodeForBucketRead(bucketId); } /** @@ -4150,7 +4118,6 @@ public class PartitionedRegion extends LocalRegion public Object localCacheGet(Object key) { RegionEntry re = getRegionMap().getEntry(key); if (re == null || re.isDestroyedOrRemoved()) { - // TODO:KIRK:OK if (re == null || Token.isRemoved(re.getValueInVM(this))) { return null; } else { return re.getValue(this); // OFFHEAP: spin until we can copy into a heap cd? @@ -4169,12 +4136,9 @@ public class PartitionedRegion extends LocalRegion /** * Test Method: Get a random set of keys from a randomly selected bucket using the provided - * <code>Random</code> number generator. + * {@code Random} number generator. * - * @param rnd * @return A set of keys from a randomly chosen bucket or {@link Collections#EMPTY_SET} - * @throws IOException - * @throws ClassNotFoundException */ public Set getSomeKeys(Random rnd) throws IOException, ClassNotFoundException { InternalDistributedMember nod = null; @@ -4196,7 +4160,7 @@ public class PartitionedRegion extends LocalRegion } buck = (Integer) buksA[ind]; - nod = getNodeForBucketRead(buck.intValue()); + nod = getNodeForBucketRead(buck); if (nod != null) { logger.debug("getSomeKeys: iteration: {} for node {}", i, nod); if (nod.equals(getMyId())) { @@ -4217,7 +4181,7 @@ public class PartitionedRegion extends LocalRegion "Test hook getSomeKeys caught a ForceReattemptException for bucketId={}{}{}. Moving on to another bucket", getPRId(), BUCKET_ID_SEPARATOR, buck, movinOn); continue; - } catch (PRLocallyDestroyedException pde) { + } catch (PRLocallyDestroyedException ignore) { logger.debug("getSomeKeys: Encountered PRLocallyDestroyedException"); checkReadiness(); continue; @@ -4226,7 +4190,7 @@ public class PartitionedRegion extends LocalRegion } // nod != null } // for logger.debug("getSomeKeys: no keys found returning empty set"); - return Collections.EMPTY_SET; + return Collections.emptySet(); } /** @@ -4239,7 +4203,7 @@ public class PartitionedRegion extends LocalRegion */ public List<BucketDump> getAllBucketEntries(final int bucketId) throws ForceReattemptException { if (bucketId >= getTotalNumberOfBuckets()) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } ArrayList<BucketDump> ret = new ArrayList<BucketDump>(); HashSet<InternalDistributedMember> collected = new HashSet<InternalDistributedMember>(); @@ -4263,6 +4227,7 @@ public class PartitionedRegion extends LocalRegion if (owner.equals(getMyId())) { BucketRegion br = this.dataStore.handleRemoteGetEntries(bucketId); Map<Object, Object> m = new HashMap<Object, Object>() { + // TODO: clean this up -- outer class is not serializable private static final long serialVersionUID = 0L; @Override @@ -4304,7 +4269,7 @@ public class PartitionedRegion extends LocalRegion final FetchEntriesResponse r; r = FetchEntriesMessage.send(owner, this, bucketId); ret.add(r.waitForEntries()); - } catch (ForceReattemptException e) { + } catch (ForceReattemptException ignore) { // node has departed? Ignore. } } // for @@ -4312,11 +4277,9 @@ public class PartitionedRegion extends LocalRegion return ret; } - /** * Fetch the keys for the given bucket identifier, if the bucket is local or remote. * - * @param bucketNum * @return A set of keys from bucketNum or {@link Collections#EMPTY_SET}if no keys can be found. */ public Set getBucketKeys(int bucketNum) { @@ -4327,12 +4290,11 @@ public class PartitionedRegion extends LocalRegion * Fetch the keys for the given bucket identifier, if the bucket is local or remote. This version * of the method allows you to retrieve Tombstone entries as well as undestroyed entries. * - * @param bucketNum * @param allowTombstones whether to include destroyed entries in the result * @return A set of keys from bucketNum or {@link Collections#EMPTY_SET}if no keys can be found. */ public Set getBucketKeys(int bucketNum, boolean allowTombstones) { - Integer buck = Integer.valueOf(bucketNum); + Integer buck = bucketNum; final int retryAttempts = calcRetry(); Set ret = null; int count = 0; @@ -4371,7 +4333,7 @@ public class PartitionedRegion extends LocalRegion if (ret != null) { return ret; } - } catch (PRLocallyDestroyedException pde) { + } catch (PRLocallyDestroyedException ignore) { if (logger.isDebugEnabled()) { logger.debug("getBucketKeys: Encountered PRLocallyDestroyedException"); } @@ -4385,14 +4347,13 @@ public class PartitionedRegion extends LocalRegion snoozer = new RetryTimeKeeper(this.retryTimeout); } InternalDistributedMember oldNode = nod; - nod = getNodeForBucketRead(buck.intValue()); + nod = getNodeForBucketRead(buck); if (nod != null && nod.equals(oldNode)) { if (snoozer.overMaximum()) { checkReadiness(); throw new TimeoutException( LocalizedStrings.PartitionedRegion_ATTEMPT_TO_ACQUIRE_PRIMARY_NODE_FOR_READ_ON_BUCKET_0_TIMED_OUT_IN_1_MS - .toLocalizedString(new Object[] {getBucketName(buck.intValue()), - Integer.valueOf(snoozer.getRetryTime())})); + .toLocalizedString(new Object[] {getBucketName(buck), snoozer.getRetryTime()})); } snoozer.waitToRetryNode(); } @@ -4403,7 +4364,7 @@ public class PartitionedRegion extends LocalRegion if (logger.isDebugEnabled()) { logger.debug("getBucketKeys: no keys found returning empty set"); } - return Collections.EMPTY_SET; + return Collections.emptySet(); } /** @@ -4478,12 +4439,7 @@ public class PartitionedRegion extends LocalRegion } /** - * - * @param nodeToBuckets - * @param values - * @param servConn * @return set of bucket-ids that could not be read from. - * @throws IOException */ private Set<Integer> handleOldNodes(HashMap nodeToBuckets, VersionedObjectList values, ServerConnection servConn) throws IOException { @@ -4510,7 +4466,7 @@ public class PartitionedRegion extends LocalRegion try { FetchKeysResponse fkr = FetchKeysMessage.send(member, this, bucket, true); keys = fkr.waitForKeys(); - } catch (ForceReattemptException fre) { + } catch (ForceReattemptException ignore) { failures.add(bucket); } } else { @@ -4642,8 +4598,6 @@ public class PartitionedRegion extends LocalRegion } /** - * - * @param retryTime * @return boolean False indicates caller should stop re-trying. */ private boolean waitForFetchRemoteEntriesRetry(RetryTimeKeeper retryTime) { @@ -4664,9 +4618,9 @@ public class PartitionedRegion extends LocalRegion keys = this.dataStore.getKeysLocally(id, true); } result.addAll(keys); - } catch (ForceReattemptException fre) { + } catch (ForceReattemptException ignore) { failures.add(id); - } catch (PRLocallyDestroyedException prlde) { + } catch (PRLocallyDestroyedException ignore) { failures.add(id); } return result; @@ -4675,12 +4629,6 @@ public class PartitionedRegion extends LocalRegion /** * Sends FetchBulkEntriesMessage to each of the nodes hosting the buckets, unless the nodes
<TRUNCATED>