http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 2e33889..4b1d846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -18,24 +18,45 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; /** * */ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter { + /** */ + private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD = + AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt"); + + /** */ + @GridToStringExclude + protected KeyLockFuture keyLockFut; + + /** */ + @GridToStringExclude + protected MvccVersionFuture mvccVerFut; + /** * @param cctx Context. * @param tx Transaction. @@ -169,6 +190,29 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT protected abstract void prepare0(boolean remap, boolean topLocked); /** + * @param mvccCrd + * @param lockCnt + * @param remap + */ + final void initMvccVersionFuture(MvccCoordinator mvccCrd, int lockCnt, boolean remap) { + if (!remap) { + mvccVerFut = new MvccVersionFuture(); + + mvccVerFut.init(mvccCrd, lockCnt); + + if (keyLockFut != null) + keyLockFut.listen(mvccVerFut); + + add(mvccVerFut); + } + else { + assert mvccVerFut != null; + + mvccVerFut.init(mvccCrd, lockCnt); + } + } + + /** * Keys lock future. */ protected static class KeyLockFuture extends GridFutureAdapter<Void> { @@ -231,4 +275,86 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT return S.toString(KeyLockFuture.class, this, super.toString()); } } + + /** + * + */ + class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener, + IgniteInClosure<IgniteInternalFuture<Void>> { + /** */ + MvccCoordinator crd; + + /** */ + volatile int lockCnt; + + @Override public void apply(IgniteInternalFuture<Void> keyLockFut) { + try { + keyLockFut.get(); + + onLockReceived(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("MvccVersionFuture ignores key lock future failure: " + e); + } + } + + /** + * @param crd Mvcc coordinator. + * @param lockCnt Expected number of lock responses. + */ + void init(MvccCoordinator crd, int lockCnt) { + assert crd != null; + assert lockCnt > 0; + + this.crd = crd; + this.lockCnt = lockCnt; + + assert !isDone(); + } + + /** + * + */ + void onLockReceived() { + int remaining = LOCK_CNT_UPD.decrementAndGet(this); + + assert remaining >= 0 : remaining; + + if (remaining == 0) { + // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter. + if (cctx.localNodeId().equals(crd.nodeId())) + onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx)); + else + cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion()); + } + } + + /** {@inheritDoc} */ + @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) { + tx.mvccInfo(new TxMvccInfo(crdId, res)); + + onDone(); + } + + /** {@inheritDoc} */ + @Override public void onMvccError(IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion()); + + ((ClusterTopologyCheckedException)e).retryReadyFuture(fut); + } + + ERR_UPD.compareAndSet(GridNearOptimisticTxPrepareFutureAdapter.this, null, e); + + onDone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccVersionFuture [crd=" + crd.nodeId() + + ", lockCnt=" + lockCnt + + ", done=" + isDone() + ']'; + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 4a2aeb8..ef2c359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; @@ -301,7 +302,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer); if (mvccCrd == null) { - onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); + onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer)); return; } @@ -456,6 +457,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** {@inheritDoc} */ @Override public void onMvccError(IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion()); + + ((ClusterTopologyCheckedException)e).retryReadyFuture(fut); + } + ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e); } @@ -492,12 +499,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA ", loc=" + ((MiniFuture)f).primary().isLocal() + ", done=" + f.isDone() + "]"; } - else if (f instanceof CacheCoordinatorsProcessor.MvccVersionFuture) { - CacheCoordinatorsProcessor.MvccVersionFuture crdFut = - (CacheCoordinatorsProcessor.MvccVersionFuture)f; + else if (f instanceof MvccCoordinatorFuture) { + MvccCoordinatorFuture crdFut = (MvccCoordinatorFuture)f; - return "[mvccCrdNode=" + crdFut.crdId + - ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) + + return "[mvccCrdNode=" + crdFut.coordinatorNodeId() + + ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) + ", done=" + f.isDone() + "]"; } else http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index 00ff4bb..104a31a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -26,11 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * @@ -81,6 +83,9 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid /** TTL for read operation. */ private long accessTtl; + /** */ + private MvccCoordinatorVersion mvccVer; + /** * Empty constructor required for {@link Message}. */ @@ -103,6 +108,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid * @param addReader Add reader flag. * @param needVer {@code True} if entry version is needed. * @param addDepInfo Deployment info. + * @param mvccVer Mvcc version. */ public GridNearSingleGetRequest( int cacheId, @@ -118,7 +124,8 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid boolean addReader, boolean needVer, boolean addDepInfo, - boolean recovery + boolean recovery, + MvccCoordinatorVersion mvccVer ) { assert key != null; @@ -131,6 +138,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid this.createTtl = createTtl; this.accessTtl = accessTtl; this.addDepInfo = addDepInfo; + this.mvccVer = mvccVer; if (readThrough) flags |= READ_THROUGH_FLAG_MASK; @@ -149,6 +157,13 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid } /** + * @return Mvcc version. + */ + @Nullable public MvccCoordinatorVersion mvccVersion() { + return mvccVer; + } + + /** * @return Key. */ public KeyCacheObject key() { @@ -322,7 +337,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid reader.incrementState(); case 8: - subjId = reader.readUuid("subjId"); + mvccVer = reader.readMessage("mvccVer"); if (!reader.isLastRead()) return false; @@ -330,7 +345,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid reader.incrementState(); case 9: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -338,6 +353,14 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid reader.incrementState(); case 10: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -396,18 +419,24 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid writer.incrementState(); case 8: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("mvccVer", mvccVer)) return false; writer.incrementState(); case 9: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 10: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeMessage("topVer", topVer)) return false; @@ -430,7 +459,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java index c24551b..36efe2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern @Override public void apply(final GridNearTxFinishFuture fut) { GridNearTxLocal tx = fut.tx(); + IgniteInternalFuture<Void> ackFut = null; + + MvccQueryTracker qryTracker = tx.mvccQueryTracker(); + TxMvccInfo mvccInfo = tx.mvccInfo(); - if (mvccInfo != null) { - IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit( - mvccInfo.coordinator(), mvccInfo.version()); + if (qryTracker != null) + ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true); + else if (mvccInfo != null) { + ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), + mvccInfo.version(), + null); + } + if (ackFut != null) { ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> ackFut) { Exception err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index a9b60d7..14536e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -403,6 +405,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit fut.getClass() == CheckRemoteTxMiniFuture.class; } + /** + * + */ + private void ackMvccCoordinatorOnRollback() { + TxMvccInfo mvccInfo = tx.mvccInfo(); + + MvccQueryTracker qryTracker = tx.mvccQueryTracker(); + + if (qryTracker != null) + qryTracker.onTxDone(mvccInfo, cctx, false); + else if (mvccInfo != null) + cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null); + } + /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") public void finish(boolean commit, boolean clearThreadMap) { @@ -421,11 +437,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit return; } - if (!commit && tx.mvccInfo() != null) { - TxMvccInfo mvccInfo = tx.mvccInfo(); - - cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version()); - } + if (!commit) + ackMvccCoordinatorOnRollback(); try { if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) { @@ -436,7 +449,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit assert mvccInfo != null; - IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs); + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(), + waitTxs); add(fut); } @@ -445,7 +459,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit GridDistributedTxMapping mapping = mappings.singleMapping(); if (mapping != null) { - assert !hasFutures() : futures(); + assert !hasFutures() || waitTxs != null : futures(); finish(1, mapping, commit); } @@ -846,6 +860,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]"; } + else if (f instanceof MvccCoordinatorFuture) { + MvccCoordinatorFuture fut = (MvccCoordinatorFuture)f; + + return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() + ", done=" + f.isDone() + "]"; + } else return "[loc=true, done=" + f.isDone() + "]"; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index a1e37a1..6a59112 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -47,10 +47,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; @@ -61,6 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -89,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; @@ -119,7 +124,8 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; * Replicated user transaction. */ @SuppressWarnings("unchecked") -public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable { +public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, + AutoCloseable, MvccCoordinatorChangeAware { /** */ private static final long serialVersionUID = 0L; @@ -169,6 +175,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @GridToStringExclude private TransactionProxyImpl proxy; + /** */ + private MvccQueryTracker mvccTracker; + /** * Empty constructor required for {@link Externalizable}. */ @@ -230,6 +239,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou trackTimeout = cctx.time().addTimeoutObject(this); } + /** + * @return Mvcc query version tracker. + */ + MvccQueryTracker mvccQueryTracker() { + return mvccTracker; + } + + /** {@inheritDoc} */ + @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { + if (mvccTracker != null) + return mvccTracker.onMvccCoordinatorChange(newCrd); + + return null; + } + /** {@inheritDoc} */ @Override public boolean near() { return true; @@ -1653,6 +1677,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * @param cctx Cache context. + * @return Mvcc version for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs). + */ + private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) { + if (!cctx.mvccEnabled() || mvccTracker == null) + return null; + + return mvccTracker.mvccVersion(); + } + + /** * @param cacheCtx Cache context. * @param keys Keys to get. * @param deserializeBinary Deserialize binary flag. @@ -1665,7 +1700,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( final GridCacheContext cacheCtx, @Nullable final AffinityTopologyVersion entryTopVer, - Collection<KeyCacheObject> keys, + final Collection<KeyCacheObject> keys, final boolean deserializeBinary, final boolean skipVals, final boolean keepCacheObjects, @@ -1677,6 +1712,46 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou init(); + if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) { + // TODO IGNITE-3478: support async tx rollback (e.g. on timeout). + final GridFutureAdapter fut = new GridFutureAdapter(); + + boolean canRemap = cctx.lockedTopologyVersion(null) == null; + + mvccTracker = new MvccQueryTracker(cacheCtx, canRemap, + new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() { + @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) { + if (e == null) { + getAllAsync(cacheCtx, + entryTopVer, + keys, + deserializeBinary, + skipVals, + keepCacheObjects, + skipStore, + recovery, + needVer).listen(new IgniteInClosure<IgniteInternalFuture<Map<Object, Object>>>() { + @Override + public void apply(IgniteInternalFuture<Map<Object, Object>> fut0) { + try { + fut.onDone(fut0.get()); + } catch (IgniteCheckedException e) { + fut.onDone(e); + } + } + }); + } + else + fut.onDone(e); + } + } + ); + + mvccTracker.requestVersion(topologyVersion()); + + return fut; + } + int keysCnt = keys.size(); boolean single = keysCnt == 1; @@ -1781,8 +1856,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), null, txEntry.keepBinary(), - null, - null); // TODO IGNITE-3478 + null, // TODO IGNITE-3478 + null); if (getRes != null) { val = getRes.value(); @@ -2165,8 +2240,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), accessPlc, !deserializeBinary, - null, - null) : null; // TODO IGNITE-3478 + mvccReadVersion(cacheCtx), // TODO IGNITE-3478 + null) : null; if (getRes != null) { val = getRes.value(); @@ -2185,7 +2260,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), accessPlc, !deserializeBinary, - null); // TODO IGNITE-3478 + mvccReadVersion(cacheCtx)); // TODO IGNITE-3478 } if (val != null) { @@ -2464,7 +2539,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param expiryPlc Expiry policy. * @return Future with {@code True} value if loading took place. */ - public IgniteInternalFuture<Void> loadMissing( + private IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, AffinityTopologyVersion topVer, boolean readThrough, @@ -2523,7 +2598,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou skipVals, needVer, /*keepCacheObject*/true, - recovery + recovery, + mvccReadVersion(cacheCtx) ).chain(new C1<IgniteInternalFuture<Object>, Void>() { @Override public Void apply(IgniteInternalFuture<Object> f) { try { @@ -2554,7 +2630,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou expiryPlc0, skipVals, needVer, - /*keepCacheObject*/true + /*keepCacheObject*/true, + mvccReadVersion(cacheCtx) ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { try { @@ -3311,8 +3388,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false))) return chainFinishFuture(finishFut, false); - cctx.mvcc().addFuture(fut0, fut0.futureId()); - IgniteInternalFuture<?> prepFut = this.prepFut; if (prepFut == null || prepFut.isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index e1c6636..80cd4c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -58,7 +58,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10; /** */ - private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02; + private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x20; /** Future ID. */ private IgniteUuid futId; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java index 39baec9..d532d8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java @@ -24,6 +24,9 @@ import java.io.Serializable; */ public class CacheCoordinatorsDiscoveryData implements Serializable { /** */ + private static final long serialVersionUID = 0L; + + /** */ private MvccCoordinator crd; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 85dde15..fd3c2af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -170,13 +171,22 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { return crdVer & CRD_VER_MASK; } + /** + * @param topVer Topology version for cache operation. + * @return Error. + */ + public static IgniteCheckedException noCoordinatorError(AffinityTopologyVersion topVer) { + return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned for " + + "topology version: " + topVer); + } + /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { statCntrs = new StatCounter[7]; statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); - statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); + statCntrs[2] = new StatCounter("CoordinatorAckRequestTx"); statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); statCntrs[4] = new StatCounter("TotalRequests"); statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); @@ -314,9 +324,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { GridCacheVersion txVer) { assert !ctx.localNodeId().equals(crd.nodeId()); - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), - crd.nodeId(), - lsnr); + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, lsnr); verFuts.put(fut.id, fut); @@ -341,20 +349,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) { assert crd != null; - long trackCntr = mvccVer.counter(); - - MvccLongList txs = mvccVer.activeTransactions(); - - if (txs != null) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); - - if (txId < trackCntr) - trackCntr = txId; - } - } + long trackCntr = queryTrackCounter(mvccVer); - Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) : + Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) : new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); try { @@ -373,6 +370,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param mvccVer Read version. + * @return + */ + private long queryTrackCounter(MvccCoordinatorVersion mvccVer) { + long trackCntr = mvccVer.counter(); + + MvccLongList txs = mvccVer.activeTransactions(); + + int size = txs.size(); + + for (int i = 0; i < size; i++) { + long txId = txs.get(i); + + if (txId < trackCntr) + trackCntr = txId; + } + + return trackCntr; + } + + /** * @param crd Coordinator. * @return Counter request future. */ @@ -380,7 +398,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { assert crd != null; // TODO IGNITE-3478: special case for local? - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null); + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null); verFuts.put(fut.id, fut); @@ -432,22 +450,24 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** * @param crd Coordinator. - * @param mvccVer Transaction version. + * @param updateVer Transaction update version. + * @param readVer Transaction read version. * @return Acknowledge future. */ - public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) { + public IgniteInternalFuture<Void> ackTxCommit(UUID crd, + MvccCoordinatorVersion updateVer, + @Nullable MvccCoordinatorVersion readVer) { assert crd != null; - assert mvccVer != null; + assert updateVer != null; WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); ackFuts.put(fut.id, fut); + CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer); + try { - ctx.io().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), - MSG_POLICY); + ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY); } catch (IgniteCheckedException e) { if (ackFuts.remove(fut.id) != null) { @@ -462,11 +482,45 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param futId Future ID. + * @param updateVer Update version. + * @param readVer Optional read version. + * @return Message. + */ + private CoordinatorAckRequestTx createTxAckMessage(long futId, + MvccCoordinatorVersion updateVer, + @Nullable MvccCoordinatorVersion readVer) + { + CoordinatorAckRequestTx msg; + + if (readVer != null) { + long trackCntr = queryTrackCounter(readVer); + + if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) { + msg = new CoordinatorAckRequestTxAndQuery(futId, + updateVer.counter(), + trackCntr); + } + else { + msg = new CoordinatorAckRequestTxAndQueryEx(futId, + updateVer.counter(), + readVer.coordinatorVersion(), + trackCntr); + } + } + else + msg = new CoordinatorAckRequestTx(futId, updateVer.counter()); + + return msg; + } + + /** * @param crdId Coordinator node ID. - * @param mvccVer Transaction version. + * @param updateVer Transaction update version. + * @param readVer Transaction read version. */ - public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) { - CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); + public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion readVer) { + CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer); msg.skipResponse(true); @@ -578,7 +632,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param nodeId Node ID. * @param msg Message. */ - private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) { + private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) { onQueryDone(nodeId, msg.counter()); } @@ -587,16 +641,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param msg Message. */ private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) { - prevCrdQueries.onQueryDone(nodeId, msg); + prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter()); } /** * @param nodeId Sender node ID. * @param msg Message. */ - private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { + private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) { onTxDone(msg.txCounter()); + if (msg.queryCounter() != COUNTER_NA) { + if (msg.queryCoordinatorVersion() == 0) + onQueryDone(nodeId, msg.queryCounter()); + else + prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter()); + } + if (STAT_CNTRS) statCntrs[2].update(); @@ -907,6 +968,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param nodeId Node ID. * @param msg Message. */ private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) { @@ -954,8 +1016,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** - * @param nodeId - * @param msg + * @param nodeId Node ID. + * @param msg Message. */ private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { try { @@ -974,18 +1036,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** - * @return + * @return Coordinator. */ public MvccCoordinator currentCoordinator() { return curCrd; } + /** + * @param curCrd Coordinator. + */ public void currentCoordinator(MvccCoordinator curCrd) { this.curCrd = curCrd; } /** - * @return + * @return Current coordinator node ID. */ public UUID currentCoordinatorId() { MvccCoordinator curCrd = this.curCrd; @@ -1013,7 +1078,33 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { */ public void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { - prevCrdQueries.processClientActiveQueries(nodeId, activeQueries); + prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processCoordinatorActiveQueriesMessage(UUID nodeId, CoordinatorActiveQueriesMessage msg) { + prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries()); + } + + /** + * @param nodeId Coordinator node ID. + * @param activeQueries Active queries. + */ + public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { + CoordinatorActiveQueriesMessage msg = new CoordinatorActiveQueriesMessage(activeQueries); + + try { + ctx.io().sendToGridTopic(nodeId, + MSG_TOPIC, + msg, + MSG_POLICY); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send active queries to mvcc coordinator: " + e); + } } /** @@ -1070,7 +1161,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** * */ - public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { + private class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> implements MvccCoordinatorFuture { /** */ private final Long id; @@ -1078,24 +1169,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private MvccResponseListener lsnr; /** */ - public final UUID crdId; + public final MvccCoordinator crd; /** */ long startTime; /** * @param id Future ID. - * @param crdId Coordinator node ID. + * @param crd Mvcc coordinator. + * @param lsnr Listener. */ - MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) { + MvccVersionFuture(Long id, MvccCoordinator crd, @Nullable MvccResponseListener lsnr) { this.id = id; - this.crdId = crdId; + this.crd = crd; this.lsnr = lsnr; if (STAT_CNTRS) startTime = System.nanoTime(); } + /** {@inheritDoc} */ + @Override public UUID coordinatorNodeId() { + return crd.nodeId(); + } + /** * @param res Response. */ @@ -1103,7 +1200,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { assert res.counter() != COUNTER_NA; if (lsnr != null) - lsnr.onMvccResponse(crdId, res); + lsnr.onMvccResponse(crd.nodeId(), res); onDone(res); } @@ -1122,7 +1219,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param nodeId Failed node ID. */ void onNodeLeft(UUID nodeId ) { - if (crdId.equals(nodeId) && verFuts.remove(id) != null) { + if (crd.nodeId().equals(nodeId) && verFuts.remove(id) != null) { ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " + "version, coordinator failed: " + nodeId); @@ -1132,14 +1229,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public String toString() { - return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']'; + return "MvccVersionFuture [crd=" + crd.nodeId() + ", id=" + id + ']'; } } /** * */ - private class WaitAckFuture extends GridFutureAdapter<Void> { + private class WaitAckFuture extends GridFutureAdapter<Void> implements MvccCoordinatorFuture { /** */ private final long id; @@ -1155,6 +1252,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** * @param id Future ID. * @param crdId Coordinator node ID. + * @param ackTx {@code True} if ack tx commit, {@code false} if waits for previous txs. */ WaitAckFuture(long id, UUID crdId, boolean ackTx) { assert crdId != null; @@ -1167,6 +1265,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { startTime = System.nanoTime(); } + /** {@inheritDoc} */ + @Override public UUID coordinatorNodeId() { + return crdId; + } + /** * */ @@ -1247,12 +1350,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { if (msg instanceof CoordinatorTxCounterRequest) processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); - else if (msg instanceof CoordinatorTxAckRequest) - processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); + else if (msg instanceof CoordinatorAckRequestTx) + processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg); else if (msg instanceof CoordinatorFutureResponse) processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); - else if (msg instanceof CoordinatorQueryAckRequest) - processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg); + else if (msg instanceof CoordinatorAckRequestQuery) + processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg); else if (msg instanceof CoordinatorQueryVersionRequest) processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); else if (msg instanceof MvccCoordinatorVersionResponse) @@ -1261,6 +1364,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); else if (msg instanceof NewCoordinatorQueryAckRequest) processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg); + else if (msg instanceof CoordinatorActiveQueriesMessage) + processCoordinatorActiveQueriesMessage(nodeId, (CoordinatorActiveQueriesMessage)msg); else U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java new file mode 100644 index 0000000..e51ec90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long cntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorAckRequestQuery() { + // No-op. + } + + /** + * @param cntr Query counter. + */ + CoordinatorAckRequestQuery(long cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorAckRequestQuery.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 134; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorAckRequestQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java new file mode 100644 index 0000000..c0512f0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorAckRequestTx implements MvccCoordinatorMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int SKIP_RESPONSE_FLAG_MASK = 0x01; + + /** */ + private long futId; + + /** */ + private long txCntr; + + /** */ + private byte flags; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorAckRequestTx() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction. + */ + CoordinatorAckRequestTx(long futId, long txCntr) { + this.futId = futId; + this.txCntr = txCntr; + } + + /** {@inheritDoc} */ + long queryCounter() { + return CacheCoordinatorsProcessor.COUNTER_NA; + } + + /** {@inheritDoc} */ + long queryCoordinatorVersion() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** + * @return {@code True} if response message is not needed. + */ + boolean skipResponse() { + return (flags & SKIP_RESPONSE_FLAG_MASK) != 0; + } + + /** + * @param val {@code True} if response message is not needed. + */ + void skipResponse(boolean val) { + if (val) + flags |= SKIP_RESPONSE_FLAG_MASK; + else + flags &= ~SKIP_RESPONSE_FLAG_MASK; + } + + /** + * @return Counter assigned tp transaction. + */ + public long txCounter() { + return txCntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("txCntr", txCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + txCntr = reader.readLong("txCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorAckRequestTx.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 131; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorAckRequestTx.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java new file mode 100644 index 0000000..86c3223 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryCntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorAckRequestTxAndQuery() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction update. + * @param qryCntr Counter assigned for transaction reads. + */ + CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) { + super(futId, txCntr); + + this.qryCntr = qryCntr; + } + + /** {@inheritDoc} */ + @Override long queryCounter() { + return qryCntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("qryCntr", qryCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + qryCntr = reader.readLong("qryCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 141; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorAckRequestTxAndQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java new file mode 100644 index 0000000..6f6f712 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryCrdVer; + + /** */ + private long qryCntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorAckRequestTxAndQueryEx() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction update. + * @param qryCrdVer Version of coordinator assigned read counter. + * @param qryCntr Counter assigned for transaction reads. + */ + CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) { + super(futId, txCntr); + + this.qryCrdVer = qryCrdVer; + this.qryCntr = qryCntr; + } + + /** {@inheritDoc} */ + @Override long queryCoordinatorVersion() { + return qryCrdVer; + } + + /** {@inheritDoc} */ + @Override long queryCounter() { + return qryCntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("qryCntr", qryCntr)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeLong("qryCrdVer", qryCrdVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + qryCntr = reader.readLong("qryCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + qryCrdVer = reader.readLong("qryCrdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 142; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java new file mode 100644 index 0000000..49b1adb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridDirectMap(keyType = Message.class, valueType = Integer.class) + private Map<MvccCounter, Integer> activeQrys; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorActiveQueriesMessage() { + // No-op. + } + + /** + * @param activeQrys Active queries. + */ + CoordinatorActiveQueriesMessage(Map<MvccCounter, Integer> activeQrys) { + this.activeQrys = activeQrys; + } + + /** + * @return Active queries. + */ + @Nullable Map<MvccCounter, Integer> activeQueries() { + return activeQrys; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG, MessageCollectionItemType.INT, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorActiveQueriesMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 144; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorActiveQueriesMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java index e7eff42..777927c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java @@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class CoordinatorFutureResponse implements MvccCoordinatorMessage { /** */ + private static final long serialVersionUID = 0L; + + /** */ private long futId; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java deleted file mode 100644 index 602d3b4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * - */ -public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long cntr; - - /** - * Required by {@link GridIoMessageFactory}. - */ - public CoordinatorQueryAckRequest() { - // No-op. - } - - /** - * @param cntr Query counter. - */ - CoordinatorQueryAckRequest(long cntr) { - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public boolean waitForCoordinatorInit() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean processedFromNioThread() { - return true; - } - - /** - * @return Counter. - */ - public long counter() { - return cntr; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeLong("cntr", cntr)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - cntr = reader.readLong("cntr"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(CoordinatorQueryAckRequest.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 134; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CoordinatorQueryAckRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java deleted file mode 100644 index 14cd6a9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * - */ -public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final int SKIP_RESPONSE_FLAG_MASK = 0x01; - - /** */ - private long futId; - - /** */ - private long txCntr; - - /** */ - private byte flags; - - /** - * Required by {@link GridIoMessageFactory}. - */ - public CoordinatorTxAckRequest() { - // No-op. - } - - /** - * @param futId Future ID. - * @param txCntr Counter assigned to transaction. - */ - CoordinatorTxAckRequest(long futId, long txCntr) { - this.futId = futId; - this.txCntr = txCntr; - } - - /** {@inheritDoc} */ - @Override public boolean waitForCoordinatorInit() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean processedFromNioThread() { - return true; - } - - /** - * @return Future ID. - */ - long futureId() { - return futId; - } - - /** - * @return {@code True} if response message is not needed. - */ - boolean skipResponse() { - return (flags & SKIP_RESPONSE_FLAG_MASK) != 0; - } - - /** - * @param val {@code True} if response message is not needed. - */ - void skipResponse(boolean val) { - if (val) - flags |= SKIP_RESPONSE_FLAG_MASK; - else - flags &= ~SKIP_RESPONSE_FLAG_MASK; - } - - /** - * @return Counter assigned tp transaction. - */ - public long txCounter() { - return txCntr; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeLong("futId", futId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong("txCntr", txCntr)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - futId = reader.readLong("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - txCntr = reader.readLong("txCntr"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(CoordinatorTxAckRequest.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 131; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CoordinatorTxAckRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java index f40df72..0d75f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java @@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class CoordinatorWaitTxsRequest implements MvccCoordinatorMessage { /** */ + private static final long serialVersionUID = 0L; + + /** */ private long futId; /** */
