http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 7c2a3e3..1113c82 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -12,14 +12,12 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -65,6 +63,7 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.locks.TXLockId; import org.apache.geode.internal.cache.locks.TXLockIdImpl; import org.apache.geode.internal.cache.locks.TXLockService; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.versions.VersionSource; @@ -80,7 +79,6 @@ import org.apache.geode.internal.offheap.annotations.Released; * commit, to other cache members. * * @since GemFire 4.0 - * */ public class TXCommitMessage extends PooledDistributionMessage implements MembershipListener, MessageWithReply { @@ -98,12 +96,10 @@ public class TXCommitMessage extends PooledDistributionMessage protected transient DM dm; // Used on the sending side of this message private transient int sequenceNum = 0; - private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null; // Maps - // receiver - // Serializables - // to - // RegionCommitList - // instances + + // Maps receiver Serializables to RegionCommitList instances + private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null; + private transient RegionCommit currentRegion; protected transient TXState txState = null; private transient boolean wasProcessed; @@ -124,7 +120,7 @@ public class TXCommitMessage extends PooledDistributionMessage private transient boolean hasReliableRegions = false; /** Set of all caching exceptions produced hile processing this tx */ - private transient Set processingExceptions = Collections.EMPTY_SET; + private transient Set processingExceptions = Collections.emptySet(); private transient ClientProxyMembershipID bridgeContext = null; @@ -149,15 +145,6 @@ public class TXCommitMessage extends PooledDistributionMessage * transaction */ public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage(); - /* - * /** this is set if this message should deserialize the WAN shadowKey sent by the sender. Sender - * will not send shadowKeys when there is a mix of 7.0 and 7.0.1 members - * - * private transient boolean shouldReadShadowKey; /** this is set if the sender has decided to - * send WAN shadowKey for 7.0.1 members - * - * private transient boolean shouldWriteShadowKey; - */ public TXCommitMessage(TXId txIdent, DM dm, TXState txState) { this.dm = dm; @@ -176,7 +163,7 @@ public class TXCommitMessage extends PooledDistributionMessage // zero arg constructor for DataSerializer } - static public TXFarSideCMTracker getTracker() { + public static TXFarSideCMTracker getTracker() { return TXCommitMessage.txTracker; } @@ -194,7 +181,7 @@ public class TXCommitMessage extends PooledDistributionMessage * Return the TXCommitMessage we have already received that is associated with id. Note because of * bug 37657 we may need to wait for it to show up. */ - static public TXCommitMessage waitForMessage(Object id, DM dm) { + public static TXCommitMessage waitForMessage(Object id, DM dm) { TXFarSideCMTracker map = getTracker(); return map.waitForMessage(id, dm); } @@ -210,12 +197,10 @@ public class TXCommitMessage extends PooledDistributionMessage // make sure we have some changes and someone to send them to if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) { // Get the persistent ids for the current region and save them - Map<InternalDistributedMember, PersistentMemberID> persistentIds = - getPersistentIds(this.currentRegion.r); - this.currentRegion.persistentIds = persistentIds; + this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.r); if (this.msgMap == null) { - this.msgMap = new HashMap<InternalDistributedMember, RegionCommitList>(); + this.msgMap = new HashMap<>(); } { RegionCommitList newRCL = null; @@ -245,18 +230,18 @@ public class TXCommitMessage extends PooledDistributionMessage } } } - { // Now deal with each existing recipient that does not care - // about this region - Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it = - this.msgMap.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next(); - if (!s.contains(me.getKey())) { - RegionCommitList rcl = me.getValue(); - RegionCommitList trimmedRcl = rcl.trim(this.currentRegion); - if (trimmedRcl != rcl) { - me.setValue(trimmedRcl); - } + + // Now deal with each existing recipient that does not care + // about this region + Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it = + this.msgMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next(); + if (!s.contains(me.getKey())) { + RegionCommitList rcl = me.getValue(); + RegionCommitList trimmedRcl = rcl.trim(this.currentRegion); + if (trimmedRcl != rcl) { + me.setValue(trimmedRcl); } } } @@ -264,13 +249,11 @@ public class TXCommitMessage extends PooledDistributionMessage this.currentRegion = null; } - - private Map<InternalDistributedMember, PersistentMemberID> getPersistentIds(LocalRegion r) { if (r instanceof DistributedRegion) { - return ((DistributedRegion) r).getCacheDistributionAdvisor().advisePersistentMembers(); + return ((CacheDistributionAdvisee) r).getCacheDistributionAdvisor().advisePersistentMembers(); } else { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } } @@ -287,17 +270,15 @@ public class TXCommitMessage extends PooledDistributionMessage this.currentRegion = null; } - Map viewVersions = new HashMap(); private Boolean needsLargeModCount; private transient boolean disableListeners = false; - /** record CacheDistributionAdvisor.startOperation versions for later cleanup */ protected void addViewVersion(DistributedRegion dr, long version) { - viewVersions.put(dr, Long.valueOf(version)); + viewVersions.put(dr, version); } protected void releaseViewVersions() { @@ -309,7 +290,7 @@ public class TXCommitMessage extends PooledDistributionMessage // need to continue the iteration if one of the regions is destroyed // since others may still be okay try { - long newv = dr.getDistributionAdvisor().endOperation(viewVersion.longValue()); + long newv = dr.getDistributionAdvisor().endOperation(viewVersion); } catch (RuntimeException ex) { rte = ex; } @@ -504,13 +485,13 @@ public class TXCommitMessage extends PooledDistributionMessage } Set cacheClosedMembers = - (processor == null) ? Collections.EMPTY_SET : processor.getCacheClosedMembers(); + (processor == null) ? Collections.emptySet() : processor.getCacheClosedMembers(); Set departedMembers = - (processor == null) ? Collections.EMPTY_SET : processor.getDepartedMembers(); + (processor == null) ? Collections.emptySet() : processor.getDepartedMembers(); // check reliability on each region - Set regionDistributionExceptions = Collections.EMPTY_SET; - Set failedRegionNames = Collections.EMPTY_SET; + Set regionDistributionExceptions = Collections.emptySet(); + Set failedRegionNames = Collections.emptySet(); for (Iterator iter = regionToRecipients.entrySet().iterator(); iter.hasNext();) { Map.Entry me = (Map.Entry) iter.next(); final RegionCommit rc = (RegionCommit) me.getKey(); @@ -519,7 +500,7 @@ public class TXCommitMessage extends PooledDistributionMessage successfulRecipients.removeAll(departedMembers); // remove members who destroyed that region or closed their cache - Set regionDestroyedMembers = (processor == null) ? Collections.EMPTY_SET + Set regionDestroyedMembers = (processor == null) ? Collections.emptySet() : processor.getRegionDestroyedMembers(rc.r.getFullPath()); successfulRecipients.removeAll(cacheClosedMembers); @@ -528,7 +509,7 @@ public class TXCommitMessage extends PooledDistributionMessage try { rc.r.handleReliableDistribution(successfulRecipients); } catch (RegionDistributionException e) { - if (regionDistributionExceptions == Collections.EMPTY_SET) { + if (regionDistributionExceptions == Collections.emptySet()) { regionDistributionExceptions = new HashSet(); failedRegionNames = new HashSet(); } @@ -545,11 +526,10 @@ public class TXCommitMessage extends PooledDistributionMessage } } - /** * Helper method for send */ - private final void setRecipientsSendData(Set recipients, ReplyProcessor21 processor, + private void setRecipientsSendData(Set recipients, ReplyProcessor21 processor, RegionCommitList rcl) { setRecipients(recipients); this.regions = rcl; @@ -601,22 +581,21 @@ public class TXCommitMessage extends PooledDistributionMessage this.farSideEntryOps.add(entryOp); } - protected final void addProcessingException(Exception e) { + protected void addProcessingException(Exception e) { // clear all previous exceptions if e is a CacheClosedException - if (this.processingExceptions == Collections.EMPTY_SET || e instanceof CancelException) { + if (this.processingExceptions == Collections.emptySet() || e instanceof CancelException) { this.processingExceptions = new HashSet(); } this.processingExceptions.add(e); } - public void setDM(DM dm) { this.dm = dm; } public DM getDM() { if (this.dm == null) { - GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying TXCommit"); + InternalCache cache = GemFireCacheImpl.getExisting("Applying TXCommit"); this.dm = cache.getDistributionManager(); } return this.dm; @@ -639,12 +618,9 @@ public class TXCommitMessage extends PooledDistributionMessage if (logger.isDebugEnabled()) { logger.debug("begin processing TXCommitMessage for {}", this.txIdent); } + // do this before CacheFactory.getInstance for bug 33471 final int oldLevel = - LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); // do this - // before - // CacheFactory.getInstance - // for bug - // 33471 + LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); boolean forceListener = false; // this gets flipped if we need to fire tx listener // it needs to default to false because we don't want to fire listeners on pr replicates try { @@ -662,20 +638,18 @@ public class TXCommitMessage extends PooledDistributionMessage try { // Pre-process each Region in the tx try { - { - Iterator it = this.regions.iterator(); - while (it.hasNext()) { - boolean failedBeginProcess = true; - RegionCommit rc = (RegionCommit) it.next(); - try { - failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent); - } catch (CacheRuntimeException problem) { - processCacheRuntimeException(problem); - } finally { - if (failedBeginProcess) { - rc.r = null; // Cause related FarSideEntryOps to skip processing - it.remove(); // Skip endProcessing as well - } + Iterator it = this.regions.iterator(); + while (it.hasNext()) { + boolean failedBeginProcess = true; + RegionCommit rc = (RegionCommit) it.next(); + try { + failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent); + } catch (CacheRuntimeException problem) { + processCacheRuntimeException(problem); + } finally { + if (failedBeginProcess) { + rc.r = null; // Cause related FarSideEntryOps to skip processing + it.remove(); // Skip endProcessing as well } } } @@ -746,22 +720,20 @@ public class TXCommitMessage extends PooledDistributionMessage } public void basicProcessOps() { - { - List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size()); - Collections.sort(this.farSideEntryOps); - Iterator it = this.farSideEntryOps.iterator(); - while (it.hasNext()) { - try { - RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next(); - entryOp.process(pendingCallbacks); - } catch (CacheRuntimeException problem) { - processCacheRuntimeException(problem); - } catch (Exception e) { - addProcessingException(e); - } + List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size()); + Collections.sort(this.farSideEntryOps); + Iterator it = this.farSideEntryOps.iterator(); + while (it.hasNext()) { + try { + RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next(); + entryOp.process(pendingCallbacks); + } catch (CacheRuntimeException problem) { + processCacheRuntimeException(problem); + } catch (Exception e) { + addProcessingException(e); } - firePendingCallbacks(pendingCallbacks); } + firePendingCallbacks(pendingCallbacks); } private void firePendingCallbacks(List<EntryEventImpl> callbacks) { @@ -963,7 +935,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(256); + StringBuilder result = new StringBuilder(256); result.append("TXCommitMessage@").append(System.identityHashCode(this)).append("#") .append(this.sequenceNum).append(" processorId=").append(this.processorId).append(" txId=") .append(this.txIdent); @@ -994,7 +966,6 @@ public class TXCommitMessage extends PooledDistributionMessage * that represents an entire transaction. At commit time the txCommitMessage sent to each node can * be a subset of the transaction, this method will combine those subsets into a complete message. * - * @param msgSet * @return the complete txCommitMessage */ public static TXCommitMessage combine(Set<TXCommitMessage> msgSet) { @@ -1014,8 +985,6 @@ public class TXCommitMessage extends PooledDistributionMessage /** * Combines the other TXCommitMessage into this message. Used to compute complete TXCommitMessage * from parts. - * - * @param other */ public void combine(TXCommitMessage other) { assert other != null; @@ -1032,7 +1001,7 @@ public class TXCommitMessage extends PooledDistributionMessage } } - public final static class RegionCommitList extends ArrayList<RegionCommit> { + public static class RegionCommitList extends ArrayList<RegionCommit> { private static final long serialVersionUID = -8910813949027683641L; private transient boolean needsAck = false; private transient RegionCommit trimRC = null; @@ -1091,7 +1060,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(256); + StringBuilder result = new StringBuilder(256); result.append('@').append(System.identityHashCode(this)).append(' ').append(super.toString()); return result.toString(); } @@ -1390,8 +1359,6 @@ public class TXCommitMessage extends PooledDistributionMessage } } - - boolean isEmpty() { return this.opKeys == null; } @@ -1444,7 +1411,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(64); + StringBuilder result = new StringBuilder(64); if (this.regionPath != null) { result.append(this.regionPath); } else { @@ -1460,8 +1427,7 @@ public class TXCommitMessage extends PooledDistributionMessage if (this.r != null) { DataSerializer.writeString(this.r.getFullPath(), out); if (this.r instanceof BucketRegion) { - DataSerializer.writeString(((BucketRegion) this.r).getPartitionedRegion().getFullPath(), - out); + DataSerializer.writeString(((Bucket) this.r).getPartitionedRegion().getFullPath(), out); } else { DataSerializer.writeString(null, out); } @@ -1557,7 +1523,6 @@ public class TXCommitMessage extends PooledDistributionMessage * @param in the data input that is used to read the data for this entry op * @param largeModCount true if the mod count is a int instead of a byte. * @param readShadowKey true if a long shadowKey should be read - * @throws ClassNotFoundException */ public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey) throws IOException, ClassNotFoundException { @@ -1665,7 +1630,7 @@ public class TXCommitMessage extends PooledDistributionMessage } } - final Object getTrackerKey() { + Object getTrackerKey() { if (this.lockId != null) { return this.lockId; } else { @@ -1677,41 +1642,39 @@ public class TXCommitMessage extends PooledDistributionMessage * Used to prevent processing of the message if we have reported to other FarSiders that we did * not received the CommitProcessMessage */ - final boolean dontProcess() { + boolean dontProcess() { return this.dontProcess; } /** * Indicate that this message should not be processed if we receive CommitProcessMessage (late) */ - final void setDontProcess() { + void setDontProcess() { this.dontProcess = true; } - final boolean isProcessing() { + boolean isProcessing() { return this.isProcessing; } - private final void setIsProcessing(boolean isProcessing) { + private void setIsProcessing(boolean isProcessing) { this.isProcessing = isProcessing; } - final boolean wasProcessed() { + boolean wasProcessed() { return this.wasProcessed; } - final void setProcessed(boolean wasProcessed) { + void setProcessed(boolean wasProcessed) { this.wasProcessed = wasProcessed; } - /********************* Region Commit Process Messages ***************************************/ - /** * The CommitProcessForLockIDMessaage is sent by the Distributed ACK TX origin to the recipients * (aka FarSiders) to indicate that a previously received RegionCommit that contained a lockId * should commence processing. */ - static final public class CommitProcessForLockIdMessage extends CommitProcessMessage { + public static class CommitProcessForLockIdMessage extends CommitProcessMessage { private TXLockId lockId; public CommitProcessForLockIdMessage() { @@ -1749,7 +1712,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(128); + StringBuilder result = new StringBuilder(128); result.append("CommitProcessForLockIdMessage@").append(System.identityHashCode(this)) .append(" lockId=").append(this.lockId); return result.toString(); @@ -1763,7 +1726,7 @@ public class TXCommitMessage extends PooledDistributionMessage * typically sent if all the TX changes are a result of load/netsearch/netload values (thus no * lockid) */ - static final public class CommitProcessForTXIdMessage extends CommitProcessMessage { + public static class CommitProcessForTXIdMessage extends CommitProcessMessage { private TXId txId; public CommitProcessForTXIdMessage() { @@ -1801,14 +1764,15 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(128); + StringBuilder result = new StringBuilder(128); result.append("CommitProcessForTXIdMessage@").append(System.identityHashCode(this)) .append(" txId=").append(this.txId); return result.toString(); } } - static abstract public class CommitProcessMessage extends PooledDistributionMessage { - protected final void basicProcess(final TXCommitMessage mess, final DistributionManager dm) { + + public abstract static class CommitProcessMessage extends PooledDistributionMessage { + protected void basicProcess(final TXCommitMessage mess, final DistributionManager dm) { dm.removeMembershipListener(mess); synchronized (mess) { if (mess.dontProcess()) { @@ -1823,8 +1787,6 @@ public class TXCommitMessage extends PooledDistributionMessage } } - /********************* Commit Process Query Message ***************************************/ - /** * The CommitProcessQueryMessage is used to attempt to recover - in the Distributed ACK TXs - when * the origin of the CommitProcess messages departed from the distributed system. The sender of @@ -1835,9 +1797,8 @@ public class TXCommitMessage extends PooledDistributionMessage * about the the tracker key - opting not to have specific messages for each type like * CommitProcessFor<Lock/TX>Id - and take the performance penalty of an extra call to * DataSerializer - * */ - static final public class CommitProcessQueryMessage extends PooledDistributionMessage { + public static class CommitProcessQueryMessage extends PooledDistributionMessage { private Object trackerKey; // Either a TXLockId or a TXId private int processorId; @@ -1885,7 +1846,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(128); + StringBuilder result = new StringBuilder(128); result.append("CommitProcessQueryMessage@").append(System.identityHashCode(this)) .append(" trackerKeyClass=").append(this.trackerKey.getClass().getName()) .append(" trackerKey=").append(this.trackerKey).append(" processorId=") @@ -1895,7 +1856,7 @@ public class TXCommitMessage extends PooledDistributionMessage } /********************* Commit Process Query Response Message **********************************/ - static final public class CommitProcessQueryReplyMessage extends ReplyMessage { + public static class CommitProcessQueryReplyMessage extends ReplyMessage { private boolean wasReceived; public CommitProcessQueryReplyMessage(boolean wasReceived) { @@ -1929,7 +1890,7 @@ public class TXCommitMessage extends PooledDistributionMessage @Override public String toString() { - StringBuffer result = new StringBuffer(128); + StringBuilder result = new StringBuilder(128); result.append("CommitProcessQueryReplyMessage@").append(System.identityHashCode(this)) .append(" wasReceived=").append(this.wasReceived).append(" processorId=") .append(this.processorId).append(" from ").append(this.getSender()); @@ -1938,7 +1899,7 @@ public class TXCommitMessage extends PooledDistributionMessage } /********************* Commit Process Query Response Processor *********************************/ - static final public class CommitProcessQueryReplyProcessor extends ReplyProcessor21 { + public static class CommitProcessQueryReplyProcessor extends ReplyProcessor21 { public boolean receivedOnePositive; CommitProcessQueryReplyProcessor(DM dm, Set members) { @@ -1956,17 +1917,19 @@ public class TXCommitMessage extends PooledDistributionMessage } @Override - final protected boolean canStopWaiting() { + protected boolean canStopWaiting() { return this.receivedOnePositive; } - final public boolean receivedACommitProcessMessage() { + public boolean receivedACommitProcessMessage() { return this.receivedOnePositive; } } /********************* MembershipListener Implementation ***************************************/ - public void memberJoined(InternalDistributedMember id) {} + public void memberJoined(InternalDistributedMember id) { + // do nothing + } public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected, String reason) {} @@ -2118,58 +2081,6 @@ public class TXCommitMessage extends PooledDistributionMessage } } - // /** Custom subclass that keeps all ReplyExceptions */ - // private class ReliableCommitReplyProcessor extends ReliableReplyProcessor21 { - // - // /** Set of members that threw CacheClosedExceptions */ - // private Set cacheExceptions = new HashSet(); - // /** key=region path, value=Set of members */ - // private Map regionExceptions = new HashMap(); - // - // public ReliableCommitReplyProcessor(DM dm, - // Set initMembers) { - // super(dm, initMembers); - // } - // protected synchronized void processException(DistributionMessage msg, - // ReplyException re) { - // // only interested in CommitReplyException - // if (re instanceof CommitReplyException) { - // CommitReplyException cre = (CommitReplyException) re; - // Set exceptions = cre.getExceptions(); - // for (Iterator iter = exceptions.iterator(); iter.hasNext();) { - // Exception ex = (Exception) iter.next(); - // if (ex instanceof CacheClosedException) { - // cacheExceptions.add(msg.getSender()); - // } - // else if (ex instanceof RegionDestroyedException) { - // String r = ((RegionDestroyedException)ex).getRegionFullPath(); - // Set members = (Set) regionExceptions.get(r); - // if (members == null) { - // members = new HashSet(); - // regionExceptions.put(r, members); - // } - // members.add(msg.getSender()); - // } - // } - // } - // else { - // // allow superclass to handle all other exceptions - // super.processException(msg, re); - // } - // } - // // these two accessors should be called after wait for replies completes - // protected Set getCacheClosedMembers() { - // return this.cacheExceptions; - // } - // protected Set getRegionDestroyedMembers(String regionFullPath) { - // Set members = (Set) this.regionExceptions.get(regionFullPath); - // if (members == null) { - // members = Collections.EMPTY_SET; - // } - // return members; - // } - // } - /** * Reply processor which collects all CommitReplyExceptions and emits a detailed failure exception * if problems occur @@ -2223,7 +2134,7 @@ public class TXCommitMessage extends PooledDistributionMessage (CommitExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -2233,7 +2144,7 @@ public class TXCommitMessage extends PooledDistributionMessage (CommitExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } } @@ -2264,14 +2175,12 @@ public class TXCommitMessage extends PooledDistributionMessage /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<InternalDistributedMember, RegionCommitList> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -2366,16 +2275,13 @@ public class TXCommitMessage extends PooledDistributionMessage public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index 2948a48..a0a4d7c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -12,16 +12,52 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializer; import org.apache.geode.GemFireException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.TransactionDataRebalancedException; +import org.apache.geode.cache.TransactionId; +import org.apache.geode.cache.TransactionInDoubtException; +import org.apache.geode.cache.TransactionListener; +import org.apache.geode.cache.TransactionWriter; +import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.distributed.TXManagerCancelledException; -import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.cache.tier.sockets.Message; @@ -32,30 +68,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap; import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry; import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.LockSupport; /** - * <p> * The internal implementation of the {@link CacheTransactionManager} interface returned by - * {@link GemFireCacheImpl#getCacheTransactionManager}. Internal operations + * {@link InternalCache#getCacheTransactionManager}. Internal operations * - * </code>TransactionListener</code> invocation, Region synchronization, transaction statistics and + * {@code TransactionListener} invocation, Region synchronization, transaction statistics and * * transaction logging are handled here - * * * @since GemFire 4.0 * @@ -67,12 +87,14 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene // Thread specific context container private final ThreadLocal<TXStateProxy> txContext; + private static TXManagerImpl currentInstance = null; + // The unique transaction ID for this Manager private final AtomicInteger uniqId; private final DM dm; - private final Cache cache; + private final InternalCache cache; // The DistributionMemberID used to construct TXId's private final InternalDistributedMember distributionMgrId; @@ -86,8 +108,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene */ public static final int NOTX = -1; - private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8); + private final List<TransactionListener> txListeners = new ArrayList<>(8); + public TransactionWriter writer = null; + private volatile boolean closed = false; private final Map<TXId, TXStateProxy> hostedTXStates; @@ -95,7 +119,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * the number of client initiated transactions to store for client failover */ - public final static int FAILOVER_TX_MAP_SIZE = + public static final int FAILOVER_TX_MAP_SIZE = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "transactionFailoverMapSize", 1000); /** @@ -106,6 +130,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene @SuppressWarnings("unchecked") private Map<TXId, TXCommitMessage> failoverMap = Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() { + // TODO: inner class is serializable but outer class is not private static final long serialVersionUID = -4156018226167594134L; protected boolean removeEldestEntry(Entry eldest) { @@ -114,7 +139,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene eldest.getKey(), (size() > FAILOVER_TX_MAP_SIZE)); } return size() > FAILOVER_TX_MAP_SIZE; - }; + } }); /** @@ -126,8 +151,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * this keeps track of all the transactions that were initiated locally. */ - private ConcurrentMap<TXId, TXStateProxy> localTxMap = - new ConcurrentHashMap<TXId, TXStateProxy>(); + private ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<>(); /** * the time in minutes after which any suspended transaction are rolled back. default is 30 @@ -152,49 +176,44 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Constructor that implements the {@link CacheTransactionManager} interface. Only only one * instance per {@link org.apache.geode.cache.Cache} - * - * @param cachePerfStats */ - public TXManagerImpl(CachePerfStats cachePerfStats, Cache cache) { + public TXManagerImpl(CachePerfStats cachePerfStats, InternalCache cache) { this.cache = cache; this.dm = ((InternalDistributedSystem) cache.getDistributedSystem()).getDistributionManager(); this.distributionMgrId = this.dm.getDistributionManagerId(); this.uniqId = new AtomicInteger(0); this.cachePerfStats = cachePerfStats; - this.hostedTXStates = new HashMap<TXId, TXStateProxy>(); - this.txContext = new ThreadLocal<TXStateProxy>(); - this.isTXDistributed = new ThreadLocal<Boolean>(); + this.hostedTXStates = new HashMap<>(); + this.txContext = new ThreadLocal<>(); + this.isTXDistributed = new ThreadLocal<>(); this.transactionTimeToLive = Integer .getInteger(DistributionConfig.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180); currentInstance = this; } - final Cache getCache() { + InternalCache getCache() { return this.cache; } - /** * Get the TransactionWriter for the cache * * @return the current TransactionWriter * @see TransactionWriter */ - public final TransactionWriter getWriter() { + public TransactionWriter getWriter() { return writer; } - - public final void setWriter(TransactionWriter writer) { - if (((GemFireCacheImpl) this.cache).isClient()) { + public void setWriter(TransactionWriter writer) { + if (this.cache.isClient()) { throw new IllegalStateException( LocalizedStrings.TXManager_NO_WRITER_ON_CLIENT.toLocalizedString()); } this.writer = writer; } - - public final TransactionListener getListener() { + public TransactionListener getListener() { synchronized (this.txListeners) { if (this.txListeners.isEmpty()) { return null; @@ -280,7 +299,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } } - final CachePerfStats getCachePerfStats() { + CachePerfStats getCachePerfStats() { return this.cachePerfStats; } @@ -396,7 +415,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene noteCommitSuccess(opStart, lifeTime, tx); } - final void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) { + void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) { long opEnd = CachePerfStats.getStatTime(); this.cachePerfStats.txFailure(opEnd - opStart, lifeTime, tx.getChanges()); TransactionListener[] listeners = getListeners(); @@ -428,7 +447,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } } - final void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) { + void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) { long opEnd = CachePerfStats.getStatTime(); this.cachePerfStats.txSuccess(opEnd - opStart, lifeTime, tx.getChanges()); TransactionListener[] listeners = getListeners(); @@ -497,7 +516,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene noteRollbackSuccess(opStart, lifeTime, tx); } - final void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) { + void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) { long opEnd = CachePerfStats.getStatTime(); this.cachePerfStats.txRollback(opEnd - opStart, lifeTime, tx.getChanges()); TransactionListener[] listeners = getListeners(); @@ -597,7 +616,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene return retVal; } - public final void setTXState(TXStateProxy val) { + public void setTXState(TXStateProxy val) { txContext.set(val); } @@ -625,11 +644,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene for (TXStateProxy proxy : this.localTxMap.values()) { proxy.close(); } - { - TransactionListener[] listeners = getListeners(); - for (int i = 0; i < listeners.length; i++) { - closeListener(listeners[i]); - } + TransactionListener[] listeners = getListeners(); + for (int i = 0; i < listeners.length; i++) { + closeListener(listeners[i]); } } @@ -660,7 +677,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * @return the state of the transaction or null. Pass this value to {@link TXManagerImpl#resume} * to reactivate the suspended transaction. */ - public final TXStateProxy internalSuspend() { + public TXStateProxy internalSuspend() { TXStateProxy result = getTXState(); if (result != null) { result.suspend(); @@ -691,26 +708,26 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * @deprecated use internalResume instead */ - public final void resume(TXStateProxy tx) { + @Deprecated + public void resume(TXStateProxy tx) { internalResume(tx); } - public final boolean isClosed() { + public boolean isClosed() { return this.closed; } - private final void checkClosed() { + private void checkClosed() { cache.getCancelCriterion().checkCancelInProgress(null); if (this.closed) { throw new TXManagerCancelledException("This transaction manager is closed."); } } - final DM getDM() { + DM getDM() { return this.dm; } - public static int getCurrentTXUniqueId() { if (currentInstance == null) { return NOTX; @@ -718,9 +735,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene return currentInstance.getMyTXUniqueId(); } - - - public final static TXStateProxy getCurrentTXState() { + public static TXStateProxy getCurrentTXState() { if (currentInstance == null) { return null; } @@ -747,9 +762,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * on the txState, on which this thread operates. Some messages like SizeMessage should not create * a new txState. * - * @param msg * @return {@link TXStateProxy} the txProxy for the transactional message - * @throws InterruptedException */ public TXStateProxy masqueradeAs(TransactionMessage msg) throws InterruptedException { if (msg.getTXUniqId() == NOTX || !msg.canParticipateInTransaction()) { @@ -828,11 +841,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * on the txState, on which this thread operates. Some messages like SizeMessage should not create * a new txState. * - * @param msg - * @param memberId * @param probeOnly - do not masquerade; just look up the TX state * @return {@link TXStateProxy} the txProxy for the transactional message - * @throws InterruptedException */ public TXStateProxy masqueradeAs(Message msg, InternalDistributedMember memberId, boolean probeOnly) throws InterruptedException { @@ -846,8 +856,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene synchronized (this.hostedTXStates) { val = this.hostedTXStates.get(key); if (val == null) { - // [sjigyasu] TODO: Conditionally create object based on distributed or non-distributed tx - // mode + // TODO: Conditionally create object based on distributed or non-distributed tx mode if (msg instanceof TransactionMessage && ((TransactionMessage) msg).isTransactionDistributed()) { val = new DistTXStateProxyImplOnDatanode(this, key, memberId); @@ -894,8 +903,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Remove the association created by {@link #masqueradeAs(TransactionMessage)} - * - * @param tx */ public void unmasquerade(TXStateProxy tx) { if (tx != null) { @@ -907,7 +914,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Cleanup the remote txState after commit and rollback * - * @param txId * @return the TXStateProxy */ public TXStateProxy removeHostedTXState(TXId txId) { @@ -942,7 +948,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Used to verify if a transaction with a given id is hosted by this txManager. * - * @param txId * @return true if the transaction is in progress, false otherwise */ public boolean isHostedTxInProgress(TXId txId) { @@ -1104,7 +1109,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * If the given transaction is already being completed by another thread this will wait for that * completion to finish and will ensure that the result is saved in the client failover map. * - * @param txId * @return true if a wait was performed */ public boolean waitForCompletingTransaction(TXId txId) { @@ -1132,7 +1136,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Returns the TXCommitMessage for a transaction that has been successfully completed. * - * @param txId * @return the commit message or an exception token e.g {@link TXCommitMessage#CMT_CONFLICT_MSG} * if the transaction threw an exception * @see #isExceptionToken(TXCommitMessage) @@ -1142,7 +1145,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } /** - * @param msg * @return true if msg is an exception token, false otherwise */ public boolean isExceptionToken(TXCommitMessage msg) { @@ -1158,7 +1160,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * during transaction execution. * * @param msg the token that represents the exception - * @param txId * @return the exception */ public RuntimeException getExceptionForToken(TXCommitMessage msg, TXId txId) { @@ -1209,13 +1210,12 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene @Override protected void process(DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { TXManagerImpl mgr = cache.getTXMgr(); mgr.removeTransactions(this.txIds, false); } } - } private ConcurrentMap<TransactionId, TXStateProxy> suspendedTXs = @@ -1290,8 +1290,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene SystemTimerTask task = this.expiryTasks.remove(txProxy.getTransactionId()); if (task != null) { if (task.cancel()) { - GemFireCacheImpl cache = (GemFireCacheImpl) this.cache; - cache.purgeCCPTimer(); + this.cache.purgeCCPTimer(); } } } @@ -1300,8 +1299,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * this map keeps track of all the threads that are waiting in * {@link #tryResume(TransactionId, long, TimeUnit)} for a particular transactionId */ - private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = - new ConcurrentHashMap<TransactionId, Queue<Thread>>(); + private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<>(); public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) { if (transactionId == null || getTXState() != null || !exists(transactionId)) { @@ -1383,11 +1381,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * schedules the transaction to expire after {@link #suspendedTXTimeout} * - * @param txId * @param expiryTimeUnit the time unit to use when scheduling the expiration */ private void scheduleExpiry(TransactionId txId, TimeUnit expiryTimeUnit) { - final GemFireCacheImpl cache = (GemFireCacheImpl) this.cache; if (suspendedTXTimeout < 0) { if (logger.isDebugEnabled()) { logger.debug("TX: transaction: {} not scheduled to expire", txId); @@ -1452,9 +1448,13 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } private static class RefCountMapEntry implements HashEntry<AbstractRegionEntry, RefCountMapEntry> { + private final AbstractRegionEntry key; + private HashEntry<AbstractRegionEntry, RefCountMapEntry> next; + private volatile int refCount; + private static final AtomicIntegerFieldUpdater<RefCountMapEntry> refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(RefCountMapEntry.class, "refCount"); @@ -1561,7 +1561,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } }; - public static final void incRefCount(AbstractRegionEntry re) { + public static void incRefCount(AbstractRegionEntry re) { TXManagerImpl mgr = currentInstance; if (mgr != null) { mgr.refCountMap.create(re, incCallback, null, null, true); @@ -1571,7 +1571,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene /** * Return true if refCount went to zero. */ - public static final boolean decRefCount(AbstractRegionEntry re) { + public static boolean decRefCount(AbstractRegionEntry re) { TXManagerImpl mgr = currentInstance; if (mgr != null) { return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null; @@ -1628,9 +1628,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } }; try { - ((GemFireCacheImpl) this.cache).getCCPTimer().schedule(task, timeout); + this.cache.getCCPTimer().schedule(task, timeout); } catch (IllegalStateException ise) { - if (!((GemFireCacheImpl) this.cache).isClosed()) { + if (!this.cache.isClosed()) { throw ise; } // task not able to be scheduled due to cache is closing, @@ -1716,7 +1716,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene LocalizedStrings.TXManagerImpl_CANNOT_CHANGE_TRANSACTION_MODE_WHILE_TRANSACTIONS_ARE_IN_PROGRESS .toLocalizedString()); } else { - isTXDistributed.set(new Boolean(flag)); + isTXDistributed.set(flag); } } @@ -1726,14 +1726,13 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * default value of this property. */ public boolean isDistributed() { - Boolean value = isTXDistributed.get(); // This can be null if not set in setDistributed(). if (value == null) { InternalDistributedSystem ids = (InternalDistributedSystem) cache.getDistributedSystem(); return ids.getOriginalConfig().getDistributedTransactions(); } else { - return value.booleanValue(); + return value; } } http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java index fd53fb1..24cbaa2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java @@ -39,20 +39,20 @@ import org.apache.geode.internal.cache.partitioned.PartitionMessage; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -/** - * - * - */ public abstract class TXMessage extends SerialDistributionMessage implements MessageWithReply, TransactionMessage { private static final Logger logger = LogService.getLogger(); private int processorId; + private int txUniqId; + private InternalDistributedMember txMemberId = null; - public TXMessage() {} + public TXMessage() { + // nothing + } public TXMessage(int txUniqueId, InternalDistributedMember onBehalfOfMember, ReplyProcessor21 processor) { @@ -73,7 +73,7 @@ public abstract class TXMessage extends SerialDistributionMessage if (logger.isDebugEnabled()) { logger.debug("processing {}", this); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (checkCacheClosing(cache) || checkDSClosing(cache.getInternalDistributedSystem())) { thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0 .toLocalizedString(dm.getId())); @@ -130,7 +130,7 @@ public abstract class TXMessage extends SerialDistributionMessage return distributedSystem == null || distributedSystem.isDisconnecting(); } - private boolean checkCacheClosing(GemFireCacheImpl cache) { + private boolean checkCacheClosing(InternalCache cache) { return cache == null || cache.isClosed(); } @@ -160,7 +160,7 @@ public abstract class TXMessage extends SerialDistributionMessage * Transaction operations override this method to do actual work * * @param txId The transaction Id to operate on - * @return true if {@link TXMessage} should send a reply false otherwise + * @return true if TXMessage should send a reply false otherwise */ protected abstract boolean operateOnTx(TXId txId, DistributionManager dm) throws RemoteOperationException; @@ -192,7 +192,6 @@ public abstract class TXMessage extends SerialDistributionMessage return txMemberId; } - @Override public int getProcessorId() { return this.processorId; http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java index eefa27c..1e586aa 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; @@ -26,23 +25,23 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.cache.locks.TXRegionLockRequest; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; /** * TXRegionLockRequest represents all the locks that need to be made for a single region. * - * * @since GemFire 4.0 - * */ -public class TXRegionLockRequestImpl - implements org.apache.geode.internal.cache.locks.TXRegionLockRequest { +public class TXRegionLockRequestImpl implements TXRegionLockRequest { private static final long serialVersionUID = 5840033961584078082L; private static final Logger logger = LogService.getLogger(); private transient LocalRegion r; + private String regionPath; + private Set<Object> entryKeys; public TXRegionLockRequestImpl() { @@ -93,26 +92,26 @@ public class TXRegionLockRequestImpl this.entryKeys.add(key); } - public final void fromData(DataInput in) throws IOException, ClassNotFoundException { + public void fromData(DataInput in) throws IOException, ClassNotFoundException { this.regionPath = DataSerializer.readString(in); - final GemFireCacheImpl cache = getCache(false); + final InternalCache cache = getCache(false); try { final int size = InternalDataSerializer.readArrayLength(in); if (cache != null && size > 0) { this.r = (LocalRegion) cache.getRegion(this.regionPath); } this.entryKeys = readEntryKeySet(size, in); - } catch (CacheClosedException cce) { + } catch (CacheClosedException ignore) { // don't throw in deserialization this.entryKeys = null; } } - private final Set<Object> readEntryKeySet(final int size, final DataInput in) + private Set<Object> readEntryKeySet(final int size, final DataInput in) throws IOException, ClassNotFoundException { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.SERIALIZER, "Reading HashSet with size {}", size); } @@ -135,21 +134,21 @@ public class TXRegionLockRequestImpl InternalDataSerializer.writeSet(this.entryKeys, out); } - public static final TXRegionLockRequestImpl createFromData(DataInput in) + public static TXRegionLockRequestImpl createFromData(DataInput in) throws IOException, ClassNotFoundException { TXRegionLockRequestImpl result = new TXRegionLockRequestImpl(); InternalDataSerializer.invokeFromData(result, in); return result; } - public final String getRegionFullPath() { + public String getRegionFullPath() { if (this.regionPath == null) { this.regionPath = this.r.getFullPath(); } return this.regionPath; } - public final Set<Object> getKeys() { + public Set<Object> getKeys() { if (this.entryKeys == null) { // check for cache closed/closing getCache(true); @@ -157,8 +156,8 @@ public class TXRegionLockRequestImpl return this.entryKeys; } - private final GemFireCacheImpl getCache(boolean throwIfClosing) { - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + private InternalCache getCache(boolean throwIfClosing) { + final InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null && !cache.isClosed()) { if (throwIfClosing) { cache.getCancelCriterion().checkCancelInProgress(null); @@ -175,7 +174,7 @@ public class TXRegionLockRequestImpl * Only safe to call in the vm that creates this request. Once it is serialized this method will * return null. */ - public final LocalRegion getLocalRegion() { + public LocalRegion getLocalRegion() { return this.r; } http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java index 167f1c1..496a812 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java @@ -97,7 +97,7 @@ public class TXRegionState { public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId, Object pendingValue) { - GemFireCacheImpl cache = r.getCache(); + InternalCache cache = r.getCache(); boolean isDistributed = false; if (cache.getTxManager().getTXState() != null) { isDistributed = cache.getTxManager().getTXState().isDistTx(); http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java index 725ad64..6a1eeed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java @@ -40,16 +40,13 @@ import org.apache.geode.internal.cache.RemoteOperationMessage.RemoteOperationRes import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; -/** - * - * - */ public class TXRemoteCommitMessage extends TXMessage { - private static final Logger logger = LogService.getLogger(); /** for deserialization */ - public TXRemoteCommitMessage() {} + public TXRemoteCommitMessage() { + // nothing + } public TXRemoteCommitMessage(int txUniqId, InternalDistributedMember onBehalfOfClientMember, ReplyProcessor21 processor) { @@ -76,7 +73,7 @@ public class TXRemoteCommitMessage extends TXMessage { @Override protected boolean operateOnTx(TXId txId, DistributionManager dm) throws RemoteOperationException { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); if (logger.isDebugEnabled()) { @@ -124,8 +121,6 @@ public class TXRemoteCommitMessage extends TXMessage { return true; } - - /** * This message is used for the reply to a remote commit operation: a commit from a stub to the tx * host. This is the reply to a {@link TXRemoteCommitMessage}. @@ -133,7 +128,9 @@ public class TXRemoteCommitMessage extends TXMessage { * @since GemFire 6.5 */ public static final class TXRemoteCommitReplyMessage extends ReplyMessage { + private transient TXCommitMessage commitMessage; + /* * Used on the fromData side to transfer the value bytes to the requesting thread */ @@ -142,7 +139,9 @@ public class TXRemoteCommitMessage extends TXMessage { /** * Empty constructor to conform to DataSerializable interface */ - public TXRemoteCommitReplyMessage() {} + public TXRemoteCommitReplyMessage() { + // nothing + } public TXRemoteCommitReplyMessage(DataInput in) throws IOException, ClassNotFoundException { fromData(in); @@ -219,7 +218,7 @@ public class TXRemoteCommitMessage extends TXMessage { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("TXRemoteCommitReplyMessage ").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java index e416e11..13b783f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java @@ -30,15 +30,12 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -/** - * - * - */ public class TXRemoteRollbackMessage extends TXMessage { - private static final Logger logger = LogService.getLogger(); - public TXRemoteRollbackMessage() {} + public TXRemoteRollbackMessage() { + // nothing + } public TXRemoteRollbackMessage(int txUniqId, InternalDistributedMember onBehalfOfClientMember, ReplyProcessor21 processor) { @@ -60,7 +57,7 @@ public class TXRemoteRollbackMessage extends TXMessage { @Override protected boolean operateOnTx(TXId txId, DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { throw new CacheClosedException( LocalizedStrings.CacheFactory_A_CACHE_HAS_NOT_YET_BEEN_CREATED.toLocalizedString()); http://git-wip-us.apache.org/repos/asf/geode/blob/d319d129/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java index 49922a0..22b95f3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java @@ -12,9 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * File comment - */ package org.apache.geode.internal.cache; import java.util.Collection; @@ -48,11 +45,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -/** - * - */ public class TXStateProxyImpl implements TXStateProxy { - private static final Logger logger = LogService.getLogger(); protected static final AtomicBoolean txDistributedClientWarningIssued = new AtomicBoolean(); @@ -63,6 +56,7 @@ public class TXStateProxyImpl implements TXStateProxy { protected DistributedMember target; private boolean commitRequestedByOwner; private boolean isJCATransaction; + /** * for client/server JTA transactions we need to have a single thread handle both beforeCompletion * and afterCompletion so that beforeC can obtain locks for the afterC step. This is that thread @@ -88,39 +82,26 @@ public class TXStateProxyImpl implements TXStateProxy { return this.synchRunnable; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getSemaphore() - */ public ReentrantLock getLock() { return this.lock; } - - /** - * @return the isJTA - */ final boolean isJTA() { return isJTA; } - /** - * @return the txId - */ final public TXId getTxId() { return txId; } - /** - * @return the txMgr - */ public final TXManagerImpl getTxMgr() { return txMgr; } protected volatile TXStateInterface realDeal; + protected boolean inProgress = true; + protected InternalDistributedMember onBehalfOfClientMember = null; /** @@ -184,10 +165,6 @@ public class TXStateProxyImpl implements TXStateProxy { return this.realDeal; } - /** - * @param managerImpl - * @param id - */ public TXStateProxyImpl(TXManagerImpl managerImpl, TXId id, InternalDistributedMember clientMember) { this.txMgr = managerImpl; @@ -196,11 +173,6 @@ public class TXStateProxyImpl implements TXStateProxy { this.onBehalfOfClientMember = clientMember; } - /** - * @param managerImpl - * @param id - * @param isjta - */ public TXStateProxyImpl(TXManagerImpl managerImpl, TXId id, boolean isjta) { this.txMgr = managerImpl; this.txId = id; @@ -219,16 +191,6 @@ public class TXStateProxyImpl implements TXStateProxy { this.isJTA = isJTA; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#checkJTA(java.lang.String) - */ - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateProxyInterface#checkJTA(java.lang.String) - */ public void checkJTA(String errmsg) throws IllegalStateException { if (isJTA()) { throw new IllegalStateException(errmsg); @@ -243,11 +205,6 @@ public class TXStateProxyImpl implements TXStateProxy { .toLocalizedString("precommit")); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#commit() - */ public void commit() throws CommitConflictException { boolean preserveTx = false; try { @@ -284,12 +241,6 @@ public class TXStateProxyImpl implements TXStateProxy { return (TransactionException) e; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#containsValueForKey(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion) - */ public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion region) { try { this.operationCount++; @@ -302,7 +253,6 @@ public class TXStateProxyImpl implements TXStateProxy { } private void trackBucketForTx(KeyInfo keyInfo) { - GemFireCacheImpl cache = (GemFireCacheImpl) txMgr.getCache(); if (keyInfo.getBucketId() >= 0) { if (logger.isDebugEnabled()) { logger.debug("adding bucket:{} for tx:{}", keyInfo.getBucketId(), getTransactionId()); @@ -313,13 +263,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#destroyExistingEntry(org.apache.geode.internal - * .cache.EntryEventImpl, boolean, java.lang.Object) - */ public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException { try { @@ -332,40 +275,19 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getBeginTime() - */ public long getBeginTime() { return getRealDeal(null, null).getBeginTime(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getCache() - */ public Cache getCache() { return txMgr.getCache(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getChanges() - */ public int getChanges() { assertBootstrapped(); return getRealDeal(null, null).getChanges(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion, boolean) - */ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean retainResult) { @@ -379,12 +301,6 @@ public class TXStateProxyImpl implements TXStateProxy { return val; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getEntry(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion) - */ public Entry getEntry(KeyInfo keyInfo, LocalRegion region, boolean allowTombstones) { try { this.operationCount++; @@ -396,51 +312,25 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getEvent() - */ public TXEvent getEvent() { assertBootstrapped(); return getRealDeal(null, null).getEvent(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getEvents() - */ public List getEvents() { assertBootstrapped(); return getRealDeal(null, null).getEvents(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getRegions() - */ public Collection<LocalRegion> getRegions() { assertBootstrapped(); return getRealDeal(null, null).getRegions(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#getTransactionId() - */ public TransactionId getTransactionId() { return txId; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#invalidateExistingEntry(org.apache.geode. - * internal.cache.EntryEventImpl, boolean, boolean) - */ public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry) { try { @@ -453,11 +343,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#isInProgress() - */ public boolean isInProgress() { return inProgress; } @@ -467,54 +352,26 @@ public class TXStateProxyImpl implements TXStateProxy { this.inProgress = progress; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#needsLargeModCount() - */ public boolean needsLargeModCount() { assertBootstrapped(); return getRealDeal(null, null).needsLargeModCount(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#nextModSerialNum() - */ public int nextModSerialNum() { assertBootstrapped(); return getRealDeal(null, null).nextModSerialNum(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#readRegion(org.apache.geode.internal.cache. - * LocalRegion) - */ public TXRegionState readRegion(LocalRegion r) { assertBootstrapped(); return getRealDeal(null, r).readRegion(r); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#rmRegion(org.apache.geode.internal.cache. - * LocalRegion) - */ public void rmRegion(LocalRegion r) { assertBootstrapped(); getRealDeal(null, r).rmRegion(r); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#rollback() - */ public void rollback() { try { getRealDeal(null, null).rollback(); @@ -526,13 +383,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#txPutEntry(org.apache.geode.internal.cache. - * EntryEventImpl, boolean, boolean, boolean) - */ public boolean txPutEntry(EntryEventImpl event, boolean ifNew, boolean requireOldValue, boolean checkResources, Object expectedOldValue) { try { @@ -546,12 +396,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#txReadEntry(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion, boolean) - */ public TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead, boolean createTxEntryIfAbsent) { try { @@ -565,36 +409,15 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#txReadRegion(org.apache.geode.internal.cache. - * LocalRegion) - */ public TXRegionState txReadRegion(LocalRegion localRegion) { assertBootstrapped(); return getRealDeal(null, localRegion).txReadRegion(localRegion); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#txWriteRegion(org.apache.geode.internal.cache. - * LocalRegion, java.lang.Object) - */ public TXRegionState txWriteRegion(LocalRegion localRegion, KeyInfo entryKey) { return getRealDeal(entryKey, localRegion).txWriteRegion(localRegion, entryKey); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateInterface#writeRegion(org.apache.geode.internal.cache. - * LocalRegion) - */ public TXRegionState writeRegion(LocalRegion r) { assertBootstrapped(); return getRealDeal(null, r).writeRegion(r); @@ -604,11 +427,6 @@ public class TXStateProxyImpl implements TXStateProxy { assert realDeal != null; } - /* - * (non-Javadoc) - * - * @see javax.transaction.Synchronization#afterCompletion(int) - */ public void afterCompletion(int status) { assertBootstrapped(); try { @@ -621,22 +439,11 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see javax.transaction.Synchronization#beforeCompletion() - */ public void beforeCompletion() { assertBootstrapped(); getRealDeal(null, null).beforeCompletion(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#containsKey(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion) - */ public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) { try { this.operationCount++; @@ -648,13 +455,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#entryCount(org.apache.geode.internal.cache. - * LocalRegion) - */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UL_UNRELEASED_LOCK", justification = "This method unlocks and then conditionally undoes the unlock in the finally-block. Review again at later time.") public int entryCount(LocalRegion localRegion) { @@ -684,13 +484,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#findObject(org.apache.geode.internal.cache. - * LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object) - */ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, @@ -706,13 +499,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#getAdditionalKeysForIterator(org.apache.geode. - * internal.cache.LocalRegion) - */ public Set getAdditionalKeysForIterator(LocalRegion currRgn) { if (this.realDeal == null) { return null; @@ -720,13 +506,6 @@ public class TXStateProxyImpl implements TXStateProxy { return getRealDeal(null, currRgn).getAdditionalKeysForIterator(currRgn); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#getEntryForIterator(org.apache.geode.internal. - * cache.LocalRegion, java.lang.Object, boolean) - */ public Object getEntryForIterator(KeyInfo key, LocalRegion currRgn, boolean rememberReads, boolean allowTombstones) { boolean resetTxState = this.realDeal == null; @@ -745,15 +524,8 @@ public class TXStateProxyImpl implements TXStateProxy { getTxMgr().internalResume(txp); } } - } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#getKeyForIterator(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion, boolean) - */ public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads, boolean allowTombstones) { boolean resetTxState = this.realDeal == null; @@ -775,33 +547,16 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#getValueInVM(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion, boolean) - */ public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead) { this.operationCount++; return getRealDeal(keyInfo, localRegion).getValueInVM(keyInfo, localRegion, rememberRead); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#isDeferredStats() - */ public boolean isDeferredStats() { assertBootstrapped(); return getRealDeal(null, null).isDeferredStats(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#putEntry(org.apache.geode.internal.cache. - * EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean) - */ public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) { @@ -816,34 +571,14 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#isInProgressAndSameAs(org.apache.geode. - * internal.cache.TXStateInterface) - */ public boolean isInProgressAndSameAs(TXStateInterface otherState) { return isInProgress() && otherState == this; } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.TXStateProxy#setLocalTXState(org.apache.geode.internal.cache. - * TXState) - */ public void setLocalTXState(TXStateInterface state) { this.realDeal = state; } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#getSerializedValue(org.apache.geode.internal. - * cache.LocalRegion, java.lang.Object, java.lang.Object) - */ public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws DataLocationException { @@ -852,13 +587,6 @@ public class TXStateProxyImpl implements TXStateProxy { requestingClient, clientEvent, returnTombstones); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#putEntryOnRemote(org.apache.geode.internal. - * cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean) - */ public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) throws DataLocationException { @@ -873,12 +601,6 @@ public class TXStateProxyImpl implements TXStateProxy { return getRealDeal(null, null).isFireCallbacks(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#destroyOnRemote(java.lang.Integer, - * org.apache.geode.internal.cache.EntryEventImpl, java.lang.Object) - */ public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue) throws DataLocationException { this.operationCount++; @@ -887,13 +609,6 @@ public class TXStateProxyImpl implements TXStateProxy { tx.destroyOnRemote(event, cacheWrite, expectedOldValue); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#invalidateOnRemote(org.apache.geode.internal. - * cache.EntryEventImpl, boolean, boolean) - */ public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry) throws DataLocationException { this.operationCount++; @@ -919,13 +634,6 @@ public class TXStateProxyImpl implements TXStateProxy { LocalizedStrings.TXState_REGION_CLEAR_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString()); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#getBucketKeys(org.apache.geode.internal.cache. - * LocalRegion, int) - */ public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) { // if this the first operation in a transaction, reset txState boolean resetTxState = this.realDeal == null; @@ -945,12 +653,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.InternalDataView#getEntryOnRemote(java.lang.Object, - * org.apache.geode.internal.cache.LocalRegion) - */ public Entry getEntryOnRemote(KeyInfo keyInfo, LocalRegion localRegion, boolean allowTombstones) throws DataLocationException { this.operationCount++; @@ -963,33 +665,15 @@ public class TXStateProxyImpl implements TXStateProxy { getRealDeal(null, null); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateProxy#getTarget() - */ public DistributedMember getTarget() { return this.target; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateProxy#setTarget(org.apache.geode.distributed. - * DistributedMember) - */ public void setTarget(DistributedMember target) { assert this.target == null; getRealDeal(target); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.InternalDataView#getRegionKeysForIteration(org.apache.geode. - * internal.cache.LocalRegion) - */ public Collection<?> getRegionKeysForIteration(LocalRegion currRegion) { if (currRegion.isUsedForPartitionedRegionBucket()) { return currRegion.getRegionKeysForIteration(); @@ -998,20 +682,10 @@ public class TXStateProxyImpl implements TXStateProxy { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateProxy#isCommitRequestedByOwner() - */ public boolean isCommitOnBehalfOfRemoteStub() { return this.commitRequestedByOwner; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateProxy#setCommitRequestedByOwner() - */ public boolean setCommitOnBehalfOfRemoteStub(boolean requestedByOwner) { return this.commitRequestedByOwner = requestedByOwner; } @@ -1039,7 +713,8 @@ public class TXStateProxyImpl implements TXStateProxy { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("TXStateProxyImpl@").append(System.identityHashCode(this)).append(" txId:") - .append(this.txId).append(" realDeal:" + this.realDeal).append(" isJTA:").append(isJTA); + .append(this.txId).append(" realDeal:").append(this.realDeal).append(" isJTA:") + .append(isJTA); return builder.toString(); } @@ -1051,7 +726,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - public boolean isMemberIdForwardingRequired() { if (this.realDeal == null) { return false; @@ -1060,7 +734,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - public TXCommitMessage getCommitMessage() { if (this.realDeal == null) { return null; @@ -1069,7 +742,6 @@ public class TXStateProxyImpl implements TXStateProxy { } } - public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts, LocalRegion region) { if (putallOp.putAllData.length == 0) { @@ -1159,7 +831,6 @@ public class TXStateProxyImpl implements TXStateProxy { // Do nothing. Not applicable for transactions. } - public void close() { if (this.realDeal != null) { this.realDeal.close();