Repository: ignite Updated Branches: refs/heads/ignite-3478 30421e399 -> 54b871422
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54b87142 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54b87142 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54b87142 Branch: refs/heads/ignite-3478 Commit: 54b871422625636a54a14efffe39a49c192af071 Parents: 30421e3 Author: sboikov <[email protected]> Authored: Mon Sep 18 17:31:37 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 18 17:31:37 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 16 ++++- .../GridNearPessimisticTxPrepareFuture.java | 15 ++++- .../mvcc/CacheCoordinatorsSharedManager.java | 38 +++++++----- .../cache/mvcc/MvccResponseListener.java | 29 +++++++++ .../cache/mvcc/CacheMvccTransactionsTest.java | 64 ++++++++++++++++++++ 5 files changed, 144 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 42c2914..0fe17a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -61,6 +61,8 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -104,7 +106,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; */ @SuppressWarnings("unchecked") public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse> - implements GridCacheVersionedFuture<GridNearTxPrepareResponse>, IgniteDiagnosticAware { + implements GridCacheVersionedFuture<GridNearTxPrepareResponse>, IgniteDiagnosticAware, MvccResponseListener { /** */ private static final long serialVersionUID = 0L; @@ -1239,7 +1241,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (crd.isLocal()) tx.mvccCoordinatorVersion(cctx.coordinators().requestTxCounterOnCoordinator(tx)); else { - IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd, tx); + IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion()); if (tx.onePhaseCommit()) waitCrdCntrFut = crdCntrFut; @@ -1299,6 +1301,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } + /** {@inheritDoc} */ + @Override public void onMvccResponse(MvccCoordinatorVersion res) { + tx.mvccCoordinatorVersion(res); + } + + /** {@inheritDoc} */ + @Override public void onMvccError(IgniteCheckedException e) { + // TODO IGNITE-3478. + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index f55bb28..8247b46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -57,7 +58,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter { +public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter implements MvccResponseListener { /** * @param cctx Context. * @param tx Transaction. @@ -432,7 +433,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.mvccCoordinatorVersion(mvccVer); } else { - IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx); + IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion()); add((IgniteInternalFuture)cntrFut); } @@ -442,6 +443,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** {@inheritDoc} */ + @Override public void onMvccResponse(MvccCoordinatorVersion res) { + tx.mvccCoordinatorVersion(res); + } + + /** {@inheritDoc} */ + @Override public void onMvccError(IgniteCheckedException e) { + ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e); + } + + /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index b3cf54e..0f7e71e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -149,27 +149,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. - * @param tx Transaction. + * @param lsnr Response listener. * @return Counter request future. */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, GridDhtTxLocalAdapter tx) { + public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion txVer) { assert !crd.isLocal() : crd; MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, - tx); + lsnr); verFuts.put(fut.id, fut); try { cctx.gridIO().sendToGridTopic(crd, MSG_TOPIC, - new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()), + new CoordinatorTxCounterRequest(fut.id, txVer), MSG_POLICY); } catch (IgniteCheckedException e) { - if (verFuts.remove(fut.id) != null) - fut.onDone(e); + fut.onError(e); } return fut; @@ -679,7 +678,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final Long id; /** */ - private GridDhtTxLocalAdapter tx; + private MvccResponseListener lsnr; /** */ public final ClusterNode crd; @@ -691,10 +690,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param id Future ID. * @param crd Coordinator. */ - MvccVersionFuture(Long id, ClusterNode crd, @Nullable GridDhtTxLocalAdapter tx) { + MvccVersionFuture(Long id, ClusterNode crd, @Nullable MvccResponseListener lsnr) { this.id = id; this.crd = crd; - this.tx = tx; + this.lsnr = lsnr; if (STAT_CNTRS) startTime = System.nanoTime(); @@ -706,19 +705,30 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager void onResponse(MvccCoordinatorVersionResponse res) { assert res.counter() != COUNTER_NA; - if (tx != null) - tx.mvccCoordinatorVersion(res); + if (lsnr != null) + lsnr.onMvccResponse(res); onDone(res); } + void onError(IgniteCheckedException err) { + if (verFuts.remove(id) != null) { + if (lsnr != null) + lsnr.onMvccError(err); + + onDone(err); + } + } + /** * @param nodeId Failed node ID. */ void onNodeLeft(UUID nodeId) { - if (crd.id().equals(nodeId) && verFuts.remove(id) != null) { - onDone(new ClusterTopologyCheckedException("Failed to request coordinator version, " + - "coordinator failed: " + nodeId)); + if (crd.id().equals(nodeId)) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + + "coordinator failed: " + nodeId); + + onError(err); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java new file mode 100644 index 0000000..11d0da0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; + +/** + * + */ +public interface MvccResponseListener { + public void onMvccResponse(MvccCoordinatorVersion res); + + public void onMvccError(IgniteCheckedException e); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index f724afb..11980a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -1352,6 +1354,68 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testCoordinatorFailurePessimisticTx() throws Exception { + testSpi = true; + + startGrids(3); + + client = true; + + final Ignite client = startGrid(3); + + final IgniteCache cache = client.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)); + + final Integer key1 = primaryKey(jcache(1)); + final Integer key2 = primaryKey(jcache(2)); + + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0)); + + crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, client.name()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } + + fail(); + } + catch (ClusterTopologyException e) { + info("Expected exception: " + e); + } + + return null; + } + }, "tx-thread"); + + crdSpi.waitForBlocked(); + + stopGrid(0); + + fut.get(); + + assertNull(cache.get(key1)); + assertNull(cache.get(key2)); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } + + assertEquals(1, cache.get(key1)); + assertEquals(2, cache.get(key2)); + } + + /** * @param N Number of object to update in single transaction. * @param srvs Number of server nodes. * @param clients Number of client nodes.
