http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java index 172fabe..aa40508 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java @@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { /* * (non-Javadoc) - * + * * @see org.apache.geode.internal.cache.TXStateInterface#commit() - * + * * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach * all */ @@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { /* * [DISTTX] TODO Write similar method to take out exception - * + * * [DISTTX] TODO Handle Reliable regions */ // if (this.hasReliableRegions) { @@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { /* * [DISTTX] TODO Write similar method to take out exception - * + * * [DISTTX] TODO Handle Reliable regions */ // if (this.hasReliableRegions) { @@ -566,7 +566,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { /* * Handle response of precommit reply - * + * * Go over list of region versions for this target and fill map */ private void populateEntryEventMap(DistributedMember target, @@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { /* * [DISTTX] TODO Write similar method to take out exception - * + * * [DISTTX] TODO Handle Reliable regions */ // if (this.hasReliableRegions) { @@ -756,7 +756,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { super.postPutAll(putallOp, successfulPuts, region); } else { region.getCancelCriterion().checkCancelInProgress(null); // fix for bug - // #43651 + // #43651 if (logger.isDebugEnabled()) { logger.debug( @@ -835,7 +835,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { super.postRemoveAll(op, successfulOps, region); } else { region.getCancelCriterion().checkCancelInProgress(null); // fix for bug - // #43651 + // #43651 if (logger.isDebugEnabled()) { logger.debug( "DistTXStateProxyImplOnCoordinator.postRemoveAll "
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 0a9ccd8..7ba7d0c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; @@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.query.internal.cq.CqService; @@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.Assert; import org.apache.geode.internal.CopyOnWriteHashSet; import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; -import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage; import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.PartitionMessage; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.cache.tier.MessageType; @@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.offheap.Releasable; import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.sequencelog.EntryLogger; import org.apache.geode.internal.util.DelayedAction; -/** - * - */ public abstract class DistributedCacheOperation { private static final Logger logger = LogService.getLogger(); public static double LOSS_SIMULATION_RATIO = 0; // test hook + public static Random LOSS_SIMULATION_GENERATOR; public static long SLOW_DISTRIBUTION_MS = 0; // test hook // constants used in subclasses and distribution messages // should use enum in source level 1.5+ + /** * Deserialization policy: do not deserialize (for byte array, null or cases where the value * should stay serialized) @@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation { } - public final static byte DESERIALIZATION_POLICY_NUMBITS = + public static final byte DESERIALIZATION_POLICY_NUMBITS = DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY); public static final short DESERIALIZATION_POLICY_END = (short) (1 << DESERIALIZATION_POLICY_NUMBITS); + public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1); public static boolean testSendingOldValues; @@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation { try { _distribute(); } catch (InvalidVersionException e) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e); } @@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation { DistributedRegion region = getRegion(); if (viewVersion != -1) { region.getDistributionAdvisor().endOperation(viewVersion); - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}", viewVersion); } @@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation { if (SLOW_DISTRIBUTION_MS > 0) { // test hook try { Thread.sleep(SLOW_DISTRIBUTION_MS); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } SLOW_DISTRIBUTION_MS = 0; @@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation { } // some members requiring old value are also in the cache op recipients set - Set needsOldValueInCacheOp = Collections.EMPTY_SET; + Set needsOldValueInCacheOp = Collections.emptySet(); // set client routing information into the event boolean routingComputed = false; FilterRoutingInfo filterRouting = null; // recipients that will get a cacheop msg and also a PR message - Set twoMessages = Collections.EMPTY_SET; + Set twoMessages = Collections.emptySet(); if (region.isUsedForPartitionedRegionBucket()) { - twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages(); + twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages(); routingComputed = true; filterRouting = getRecipientFilterRouting(recipients); if (filterRouting != null) { @@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation { // some members need PR notification of the change for client/wan // notification - Set adjunctRecipients = Collections.EMPTY_SET; + Set adjunctRecipients = Collections.emptySet(); // Partitioned region listener notification messages piggyback on this // operation's replyprocessor and need to be sent at the same time as @@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation { recipients.removeAll(needsOldValueInCacheOp); } - Set cachelessNodes = Collections.EMPTY_SET; - Set adviseCacheServers = Collections.EMPTY_SET; - Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = - new HashSet<InternalDistributedMember>(); + Set cachelessNodes = Collections.emptySet(); + Set adviseCacheServers; + Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>(); if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) { cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys(); if (!cachelessNodes.isEmpty()) { List list = new ArrayList(cachelessNodes); for (Object member : cachelessNodes) { - if (!recipients.contains(member)) { + if (!recipients.contains(member) || adjunctRecipients.contains(member)) { // Don't include those originally excluded. list.remove(member); - } else if (adjunctRecipients.contains(member)) { - list.remove(member); } } cachelessNodes.clear(); @@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation { if (!reliableOp || region.isNoDistributionOk()) { // nothing needs be done in this case } else { - region.handleReliableDistribution(Collections.EMPTY_SET); + region.handleReliableDistribution(Collections.emptySet()); } - /** compute local client routing before waiting for an ack only for a bucket */ + // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); this.event.setLocalFilterInfo(filterInfo); @@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation { } else { boolean directAck = false; boolean useMulticast = region.getMulticastEnabled() - && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();; + && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast(); boolean shouldAck = shouldAck(); if (shouldAck) { @@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation { recipients); } waitForMembers.removeAll(recipients); - recipients = Collections.EMPTY_SET; + recipients = Collections.emptySet(); } } if (reliableOp) { @@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation { } adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients); - adviseCacheServers = ((BucketRegion) region).getPartitionedRegion() + adviseCacheServers = ((Bucket) region).getPartitionedRegion() .getCacheDistributionAdvisor().adviseCacheServers(); adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers); @@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation { } } - /** compute local client routing before waiting for an ack only for a bucket */ + // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); event.setLocalFilterInfo(filterInfo); @@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation { } } - /** * Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results * key caching. the destroy event keys are marked as destroyed instead of removing them, this is @@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation { continue; } - CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion() + CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion() .getCacheDistributionAdvisor().getProfile(m); if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile() @@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation { continue; } - for (Object value : cf.filterProfile.getCqMap().values()) { ServerCQ cq = (ServerCQ) value; @@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation { Long cqID = e.getKey(); // For the CQs satisfying the event with destroy CQEvent, remove // the entry form CQ cache. - if (cq.getFilterID() == cqID - && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) { - cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true); + if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) { + cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true); } } } } } - /** * Get the adjunct receivers for a partitioned region operation * @@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation { /** * perform any operation-specific initialization on the given reply processor - * - * @param p - * @param msg */ protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) { // nothing to do here - see UpdateMessage @@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation { } } - /** - * @param closedMembers - */ private void handleClosedMembers(Set<InternalDistributedMember> closedMembers, Map<InternalDistributedMember, PersistentMemberID> persistentIds) { if (persistentIds == null) { @@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation { return null; } CacheDistributionAdvisor advisor; - // if (region.isUsedForPartitionedRegionBucket()) { - advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor(); - // } else { - // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor(); - // } + advisor = region.getPartitionedRegion().getCacheDistributionAdvisor(); return advisor.adviseFilterRouting(this.event, cacheOpRecipients); } @@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation { protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1); protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1); - private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400; public boolean needsRouting; @@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation { return this.op; } - /** sets the concurrency versioning tag for this message */ public void setVersionTag(VersionTag tag) { this.versionTag = tag; @@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation { /** * process a reply * - * @param reply - * @param processor * @return true if the reply-processor should continue to process this response */ boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) { @@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation { * @param event the entry event that contains the old value */ public void appendOldValueToMessage(EntryEventImpl event) { - { - @Unretained - Object val = event.getRawOldValue(); - if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1 - || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) { - return; - } + @Unretained + Object val = event.getRawOldValue(); + if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1 + || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) { + return; } event.exportOldValue(this); } @@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation { protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) { Assert.assertTrue(this.regionPath != null, "regionPath was null"); - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem()); return gfc.getRegionByPathForProcessing(this.regionPath); } @@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation { final LocalRegion lclRgn = getLocalRegionForProcessing(dm); sendReply = false; basicProcess(dm, lclRgn); - } catch (CancelException e) { + } catch (CancelException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); @@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation { // region if (!rgn.isEventTrackerInitialized() && (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event"); } return; @@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation { sendReply = operateOnRegion(event, dm) && sendReply; } finally { if (event instanceof EntryEventImpl) { - ((EntryEventImpl) event).release(); + ((Releasable) event).release(); } } - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Region destroyed: nothing to do", this); } - } catch (CancelException e) { + } catch (CancelException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); @@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation { if (!lclRgn.isDestroyed()) { logger.error("Got disk access exception, expected region to be destroyed", e); } - } catch (EntryNotFoundException e) { + } catch (EntryNotFoundException ignore) { this.appliedOperation = true; if (logger.isDebugEnabled()) { logger.debug("{} Entry not found, nothing to do", this); @@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation { if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871 // distributed-no-ack message. Don't respond } else { - ReplyException exception = rex; - ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false, + ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false, isInternal()); } } @@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation { * When an event is discarded because of an attempt to overwrite a more recent change we still * need to deliver that event to clients. Clients can then perform their own concurrency checks * on the event. - * - * @param rgn - * @param ev */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { @@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation { rgn.notifyBridgeClients(ev); } - // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys, - // String path) { - // return LocalRegion.getRegionFromPath(sys, path); - // } - protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException; @@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation { @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { - // super.fromData(in); short bits = in.readShort(); short extBits = in.readShort(); this.flags = bits; @@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation { @Override public void toData(DataOutput out) throws IOException { - // super.toData(out); - short bits = 0; short extendedBits = 0; bits = computeCompressedShort(bits); @@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation { static class CacheOperationReplyProcessor extends DirectReplyProcessor { public CacheOperationMessage msg; - public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = - new CopyOnWriteHashSet<InternalDistributedMember>(); + public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>(); public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) { super(system, initMembers);
