Repository: ignite Updated Branches: refs/heads/ignite-6149 91bbb7cd2 -> 085a32190
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/085a3219 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/085a3219 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/085a3219 Branch: refs/heads/ignite-6149 Commit: 085a3219088bc1610d1928c437f075a6fb9a4f9b Parents: 91bbb7c Author: sboikov <[email protected]> Authored: Fri Sep 8 16:02:16 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 8 17:16:26 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 14 +- .../cache/IgniteCacheOffheapManager.java | 13 +- .../cache/IgniteCacheOffheapManagerImpl.java | 59 +++++- .../mvcc/CacheCoordinatorsSharedManager.java | 112 ++++++++++-- .../cache/mvcc/CoordinatorFutureResponse.java | 122 +++++++++++++ .../cache/mvcc/CoordinatorTxAckResponse.java | 122 ------------- .../cache/mvcc/CoordinatorWaitTxsRequest.java | 150 +++++++++++++++ .../processors/cache/mvcc/MvccCounter.java | 182 ------------------- .../cache/persistence/CacheDataRow.java | 2 +- .../cache/persistence/CacheDataRowAdapter.java | 2 +- .../cache/persistence/CacheSearchRow.java | 2 +- .../persistence/GridCacheOffheapManager.java | 7 +- .../cache/tree/AbstractDataInnerIO.java | 4 +- .../cache/tree/AbstractDataLeafIO.java | 6 +- .../processors/cache/tree/CacheDataTree.java | 6 +- .../processors/cache/tree/MvccDataRow.java | 18 +- .../processors/cache/tree/MvccSearchRow.java | 12 +- .../processors/cache/tree/SearchRow.java | 2 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 93 ++++++++++ .../database/FreeListImplSelfTest.java | 4 +- .../processors/query/h2/opt/GridH2Row.java | 2 +- 21 files changed, 564 insertions(+), 370 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1c1bfb7..cf3bd2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -105,10 +105,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest; -import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckResponse; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -893,7 +893,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 132: - msg = new CoordinatorTxAckResponse(); + msg = new CoordinatorFutureResponse(); break; @@ -907,13 +907,13 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 135: - msg = new MvccCounter(); + case 136: + msg = new MvccCoordinatorVersionResponse(); return msg; - case 136: - msg = new MvccCoordinatorVersionResponse(); + case 137: + msg = new CoordinatorWaitTxsRequest(); return msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index a8c2f7e..7c4d209 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseL import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridCursor; @@ -173,7 +174,15 @@ public interface IgniteCacheOffheapManager { public void invoke(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) throws IgniteCheckedException; - public void mvccUpdate(GridCacheMapEntry entry, + /** + * @param entry Entry. + * @param val Value. + * @param ver Cache version. + * @param mvccVer Mvcc update version. + * @return Transactions to wait for before finishing current transaction. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridLongList mvccUpdate(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException; @@ -466,7 +475,7 @@ public interface IgniteCacheOffheapManager { long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; - void mvccUpdate( + GridLongList mvccUpdate( GridCacheContext cctx, KeyCacheObject key, CacheObject val, http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 9a8be390..5549c78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -363,11 +363,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void mvccUpdate(GridCacheMapEntry entry, + @Override public GridLongList mvccUpdate(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { - dataStore(entry.localPartition()).mvccUpdate(entry.context(), + return dataStore(entry.localPartition()).mvccUpdate(entry.context(), entry.key(), val, ver, @@ -1302,8 +1302,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataRow; } + private int compare(CacheDataRow row, long crdVer, long mvccCntr) { + int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer); + + if (cmp != 0) + return cmp; + + return Long.compare(row.mvccUpdateCounter(), mvccCntr); + } + /** {@inheritDoc} */ - @Override public void mvccUpdate(GridCacheContext cctx, + @Override public GridLongList mvccUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, @@ -1336,7 +1345,45 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) dataRow.cacheId(cctx.cacheId()); - dataTree.putx(dataRow); + boolean old = dataTree.putx(dataRow); + + assert !old; + + GridLongList activeTxs = mvccVer.activeTransactions(); + + // TODO IGNITE-3484: need special method. + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), + new MvccSearchRow(cacheId, key, 1, 1)); + + GridLongList waitTxs = null; + + boolean first = true; + + while (cur.next()) { + CacheDataRow oldVal = cur.get(); + + if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && + activeTxs.contains(oldVal.mvccUpdateCounter())) { + if (waitTxs == null) + waitTxs = new GridLongList(); + + waitTxs.add(oldVal.mvccUpdateCounter()); + } + else if (!first) { + int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + + if (cmp <= 0) { + boolean rmvd = dataTree.removex(oldVal); + + assert rmvd; + } + } + + first = false; + } + + return waitTxs; } finally { busyLock.leaveBusy(); @@ -1588,12 +1635,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager while (cur.next()) { CacheDataRow row0 = cur.get(); - assert row0.mvccUpdateTopologyVersion() > 0 : row0; + assert row0.mvccCoordinatorVersion() > 0 : row0; boolean visible; if (txs != null) { - visible = row0.mvccUpdateTopologyVersion() != ver.coordinatorVersion() + visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() || !txs.contains(row0.mvccUpdateCounter()); } else http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index ccd22d8..7034aca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -78,7 +76,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); /** */ - private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>(); + private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); /** */ private final AtomicLong futIdCntr = new AtomicLong(); @@ -86,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** */ private final CountDownLatch crdLatch = new CountDownLatch(1); - /** */ + /** Topology version when local node was assigned as coordinator. */ private long crdVer; /** {@inheritDoc} */ @@ -193,11 +191,39 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. + * @param txs Transaction IDs. + * @return Future. + */ + public IgniteInternalFuture<Void> waitTxsFuture(ClusterNode crd, GridLongList txs) { + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd); + + ackFuts.put(fut.id, fut); + + try { + cctx.gridIO().sendToGridTopic(crd, + TOPIC_CACHE_COORDINATOR, + new CoordinatorWaitTxsRequest(fut.id, txs), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(); // No need to ack, finish without error. + } + catch (IgniteCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + /** + * @param crd Coordinator. * @param txId Transaction ID. * @return Acknowledge future. */ public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, GridCacheVersion txId) { - TxAckFuture fut = new TxAckFuture(futIdCntr.incrementAndGet(), crd); + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd); ackFuts.put(fut.id, fut); @@ -345,7 +371,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager try { cctx.gridIO().sendToGridTopic(nodeId, TOPIC_CACHE_COORDINATOR, - new CoordinatorTxAckResponse(msg.futureId()), + new CoordinatorFutureResponse(msg.futureId()), SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { @@ -362,8 +388,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param nodeId Sender node ID. * @param msg Message. */ - private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorTxAckResponse msg) { - TxAckFuture fut = ackFuts.remove(msg.futureId()); + private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { + WaitAckFuture fut = ackFuts.remove(msg.futureId()); if (fut != null) fut.onResponse(); @@ -399,14 +425,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager assert old == null : txId; - long minQry = 0; + long cleanupVer = Long.MAX_VALUE; - for (Long qryCntr : activeQueries.keySet()) { - if (qryCntr < minQry) - minQry = qryCntr; + for (Long qryVer : activeQueries.keySet()) { + if (qryVer < cleanupVer) + cleanupVer = qryVer - 1; } - return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, minQry); + return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, cleanupVer); } /** @@ -418,6 +444,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager assert cntr != null; committedCntr.setIfGreater(cntr); + + notifyAll(); // TODO IGNITE-3478. } /** @@ -465,6 +493,52 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** + * @param msg Message. + */ + private void processCoordinatorWaitTxsRequest(UUID nodeId, CoordinatorWaitTxsRequest msg) { + GridLongList txs = msg.transactions(); + + // TODO IGNITE-3478. + synchronized (this) { + for (int i = 0; i < txs.size(); i++) { + long txId = txs.get(i); + + while (hasActiveTx(txId)) { + try { + wait(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + try { + cctx.gridIO().sendToGridTopic(nodeId, + TOPIC_CACHE_COORDINATOR, + new CoordinatorFutureResponse(msg.futureId()), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + + } + + private boolean hasActiveTx(long txId) { + for (Long id : activeTxs.values()) { + if (id == txId) + return true; + } + + return false; + } + + /** * @param topVer Topology version. * @return MVCC coordinator for given topology version. */ @@ -552,7 +626,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * */ - private class TxAckFuture extends GridFutureAdapter<Void> { + private class WaitAckFuture extends GridFutureAdapter<Void> { /** */ private final long id; @@ -563,7 +637,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param id Future ID. * @param crd Coordinator. */ - TxAckFuture(long id, ClusterNode crd) { + WaitAckFuture(long id, ClusterNode crd) { this.id = id; this.crd = crd; } @@ -599,7 +673,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager for (MvccVersionFuture fut : verFuts.values()) fut.onNodeLeft(nodeId); - for (TxAckFuture fut : ackFuts.values()) + for (WaitAckFuture fut : ackFuts.values()) fut.onNodeLeft(nodeId); } } @@ -631,14 +705,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); else if (msg instanceof CoordinatorTxAckRequest) processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); - else if (msg instanceof CoordinatorTxAckResponse) - processCoordinatorTxAckResponse(nodeId, (CoordinatorTxAckResponse)msg); + else if (msg instanceof CoordinatorFutureResponse) + processCoordinatorTxAckResponse(nodeId, (CoordinatorFutureResponse)msg); else if (msg instanceof CoordinatorQueryAckRequest) processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); else if (msg instanceof CoordinatorQueryVersionRequest) processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); else if (msg instanceof MvccCoordinatorVersionResponse) processCoordinatorQueryVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); + else if (msg instanceof CoordinatorWaitTxsRequest) + processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); else U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java new file mode 100644 index 0000000..4033733 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorFutureResponse implements MvccCoordinatorMessage { + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorFutureResponse() { + // No-op. + } + + /** + * @param futId Future ID. + */ + CoordinatorFutureResponse(long futId) { + this.futId = futId; + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorFutureResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 132; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorFutureResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java deleted file mode 100644 index 059416c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * - */ -public class CoordinatorTxAckResponse implements MvccCoordinatorMessage { - /** */ - private long futId; - - /** - * Required by {@link GridIoMessageFactory}. - */ - public CoordinatorTxAckResponse() { - // No-op. - } - - /** - * @param futId Future ID. - */ - CoordinatorTxAckResponse(long futId) { - this.futId = futId; - } - - /** - * @return Future ID. - */ - long futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public boolean waitForCoordinatorInit() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeLong("futId", futId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - futId = reader.readLong("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(CoordinatorTxAckResponse.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 132; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CoordinatorTxAckResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java new file mode 100644 index 0000000..e66e2b9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorWaitTxsRequest implements MvccCoordinatorMessage { + /** */ + private long futId; + + /** */ + private GridLongList txs; + + /** + * + */ + public CoordinatorWaitTxsRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txs Transactions to wait for. + */ + public CoordinatorWaitTxsRequest(long futId, GridLongList txs) { + assert txs != null && txs.size() > 0 : txs; + + this.futId = futId; + this.txs = txs; + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** + * @return Transactions to wait for. + */ + GridLongList transactions() { + return txs; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("txs", txs)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + txs = reader.readMessage("txs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorWaitTxsRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 137; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorWaitTxsRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java deleted file mode 100644 index 161e8d4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; - -/** - * - */ -public class MvccCounter implements Comparable<MvccCounter>, Message { - /** */ - private long crdVer; - - /** */ - private long cntr; - - /** */ - private long cleanupCntr; - - /** - * - */ - public MvccCounter() { - // No-op. - } - - /** - * @param crdVer Coordinator version. - * @param cntr Coordinator counter. - */ - public MvccCounter(long crdVer, long cntr, long cleanupCntr) { - assert crdVer > 0 : crdVer; - assert cntr != CacheCoordinatorsSharedManager.COUNTER_NA; - - this.crdVer = crdVer; - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull MvccCounter other) { - int cmp = Long.compare(crdVer, other.crdVer); - - if (cmp != 0) - return cmp; - - return Long.compare(cntr, other.cntr); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - MvccCounter that = (MvccCounter) o; - - return crdVer == that.crdVer && cntr == that.cntr; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = (int) (crdVer ^ (crdVer >>> 32)); - - res = 31 * res + (int) (cntr ^ (cntr >>> 32)); - - return res; - } - - /** - * @return Coordinator version. - */ - public long coordinatorVersion() { - return crdVer; - } - - /** - * @return Counters. - */ - public long counter() { - return cntr; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeLong("cntr", cntr)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeLong("crdVer", crdVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - cntr = reader.readLong("cntr"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - crdVer = reader.readLong("crdVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(MvccCounter.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 135; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MvccCounter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java index 5316eef..7c52c7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java @@ -55,7 +55,7 @@ public interface CacheDataRow extends CacheSearchRow { */ public void key(KeyCacheObject key); - public long mvccUpdateTopologyVersion(); + public long mvccCoordinatorVersion(); public long mvccUpdateCounter(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index 11da76d..4aef9f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -573,7 +573,7 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { + @Override public long mvccCoordinatorVersion() { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java index 7834a03..533d8f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java @@ -43,7 +43,7 @@ public interface CacheSearchRow { */ public int cacheId(); - public long mvccUpdateTopologyVersion(); + public long mvccCoordinatorVersion(); public long mvccUpdateCounter(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 5a88f9c..ffcfd8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -830,7 +831,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { + @Override public long mvccCoordinatorVersion() { return 0; // TODO IGNITE-3478. } } @@ -1247,14 +1248,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void mvccUpdate(GridCacheContext cctx, + @Override public GridLongList mvccUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.mvccUpdate(cctx, key, val, ver, mvccVer); + return delegate.mvccUpdate(cctx, key, val, ver, mvccVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index a1dacd0..d87b5ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -59,10 +59,10 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i } if (storeMvccVersion()) { - assert row.mvccUpdateTopologyVersion() > 0 : row; + assert row.mvccCoordinatorVersion() > 0 : row; assert row.mvccUpdateCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row; - PageUtils.putLong(pageAddr, off, row.mvccUpdateTopologyVersion()); + PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion()); off += 8; PageUtils.putLong(pageAddr, off, row.mvccUpdateCounter()); http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index bc27a21..0be84c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -58,13 +58,13 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp } if (storeMvccVersion()) { - long mvccUpdateTopVer = row.mvccUpdateTopologyVersion(); + long mvccCrdVer = row.mvccCoordinatorVersion(); long mvccUpdateCntr = row.mvccUpdateCounter(); - assert mvccUpdateTopVer > 0 : mvccUpdateCntr; + assert mvccCrdVer > 0 : mvccCrdVer; assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA; - PageUtils.putLong(pageAddr, off, mvccUpdateTopVer); + PageUtils.putLong(pageAddr, off, mvccCrdVer); off += 8; PageUtils.putLong(pageAddr, off, mvccUpdateCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index 1fcf8dd..3bd0b02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { /** {@inheritDoc} */ @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { - assert !grp.mvccEnabled() || row.mvccUpdateTopologyVersion() != 0;// || row.getClass() == SearchRow.class; + assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class; RowLinkIO io = (RowLinkIO)iox; @@ -158,9 +158,9 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { if (cmp != 0 || !grp.mvccEnabled()) return 0; - long mvccTopVer = io.getMvccUpdateTopologyVersion(pageAddr, idx); + long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx); - cmp = Long.compare(row.mvccUpdateTopologyVersion(), mvccTopVer); + cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer); if (cmp != 0) return cmp; http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java index 17cc9e0..a3d2ec4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; */ public class MvccDataRow extends DataRow { /** */ - private long mvccTopVer; + private long crdVer; /** */ private long mvccCntr; @@ -39,16 +39,16 @@ public class MvccDataRow extends DataRow { * @param link * @param part * @param rowData - * @param mvccTopVer + * @param crdVer * @param mvccCntr */ - public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long mvccTopVer, long mvccCntr) { + public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) { super(grp, hash, link, part, rowData); - assert mvccTopVer > 0 : mvccTopVer; + assert crdVer > 0 : crdVer; assert mvccCntr != CacheCoordinatorsSharedManager.COUNTER_NA; - this.mvccTopVer = mvccTopVer; + this.crdVer = crdVer; this.mvccCntr = mvccCntr; } @@ -64,17 +64,17 @@ public class MvccDataRow extends DataRow { GridCacheVersion ver, int part, int cacheId, - long mvccTopVer, + long crdVer, long mvccCntr) { super(key, val, ver, part, 0L, cacheId); this.mvccCntr = mvccCntr; - this.mvccTopVer = mvccTopVer; + this.crdVer = crdVer; } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { - return mvccTopVer; + @Override public long mvccCoordinatorVersion() { + return crdVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java index ae3da98..e6c5268 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; */ public class MvccSearchRow extends SearchRow { /** */ - private long mvccTopVer; + private long crdVer; /** */ private long mvccCntr; @@ -32,19 +32,19 @@ public class MvccSearchRow extends SearchRow { /** * @param cacheId * @param key - * @param mvccTopVer + * @param crdVer * @param mvccCntr */ - public MvccSearchRow(int cacheId, KeyCacheObject key, long mvccTopVer, long mvccCntr) { + public MvccSearchRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr) { super(cacheId, key); - this.mvccTopVer = mvccTopVer; + this.crdVer = crdVer; this.mvccCntr = mvccCntr; } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { - return mvccTopVer; + @Override public long mvccCoordinatorVersion() { + return crdVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java index 77bcfec..6ab80d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java @@ -76,7 +76,7 @@ public class SearchRow implements CacheSearchRow { } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { + @Override public long mvccCoordinatorVersion() { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index e7478dc..e3b751e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -50,7 +51,9 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -354,6 +357,95 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testWaitPreviousTxAck() throws Exception { + testSpi = true; + + startGrid(0); + + client = true; + + final Ignite ignite = startGrid(1); + + final IgniteCache<Object, Object> cache = + ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 16)); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + cache.put(2, 1); + cache.put(3, 1); + + tx.commit(); + } + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(ignite); + + clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + /** */ + boolean block = true; + + @Override public boolean apply(ClusterNode node, Message msg) { + if (block && msg instanceof CoordinatorTxAckRequest) { + block = false; + + return true; + } + + return false; + } + }); + + IgniteInternalFuture<?> txFut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(2, 2); + cache.put(3, 2); + + tx.commit(); + } + + return null; + } + }); + + IgniteInternalFuture<?> txFut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(1, 3); + cache.put(2, 3); + + tx.commit(); + } + + // Should see changes mady by both tx1 and tx2. + Map<Object, Object> res = cache.getAll(F.asSet(1, 2, 3)); + + assertEquals(3, res.get(1)); + assertEquals(3, res.get(2)); + assertEquals(2, res.get(3)); + + return null; + } + }); + + clientSpi.waitForBlocked(); + + Thread.sleep(1000); + + clientSpi.stopBlock(true); + + txFut1.get(); + txFut2.get(); + + Map<Object, Object> res = cache.getAll(F.asSet(1, 2, 3)); + + assertEquals(3, res.get(1)); + assertEquals(3, res.get(2)); + assertEquals(2, res.get(3)); + } + + /** + * @throws Exception If failed. + */ public void testPartialCommitGetAll() throws Exception { testSpi = true; @@ -424,6 +516,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } Set<Integer> keys = new HashSet<>(); + keys.add(key1); keys.add(key2); http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java index 384f7b9..6ae5d6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java @@ -427,12 +427,12 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public long mvccUpdateCounter() { + @Override public long mvccCoordinatorVersion() { return 0; } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { + @Override public long mvccUpdateCounter() { return 0; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java index e3144b3..02e4df8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java @@ -181,7 +181,7 @@ public abstract class GridH2Row implements GridSearchRowPointer, CacheDataRow, R } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { + @Override public long mvccCoordinatorVersion() { throw new UnsupportedOperationException(); }
