Repository: ignite Updated Branches: refs/heads/ignite-1607 820c0ecd3 -> 920d7472d
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/920d7472 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/920d7472 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/920d7472 Branch: refs/heads/ignite-1607 Commit: 920d7472d52c45765990269717f0b11563b2575e Parents: 820c0ec Author: sboikov <[email protected]> Authored: Mon Oct 5 12:29:45 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 5 16:10:02 2015 +0300 ---------------------------------------------------------------------- ...arOptimisticSerializableTxPrepareFuture.java | 183 ++++- .../near/GridNearOptimisticTxPrepareFuture.java | 18 +- .../GridNearPessimisticTxPrepareFuture.java | 8 +- .../cache/distributed/near/GridNearTxLocal.java | 7 + .../near/GridNearTxPrepareFutureAdapter.java | 12 +- .../CacheSerializableTransactionsTest.java | 697 +++++++++++++++++++ .../cache/CacheSerializableTxTest.java | 687 ------------------ ...niteCacheClientNodeChangingTopologyTest.java | 147 ++++ .../testsuites/IgniteCacheTestSuite4.java | 3 + 9 files changed, 1019 insertions(+), 743 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 62fc40f..4a7efb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -46,6 +47,8 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -58,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -74,6 +78,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre @GridToStringInclude private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + /** */ + private final AtomicReference<ClientRemapFuture> remapFutRef; + /** * @param cctx Context. * @param tx Transaction. @@ -85,6 +92,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre // Should wait for all mini futures completion before finishing tx. ignoreChildFailures(IgniteCheckedException.class); + + remapFutRef = cctx.kernalContext().clientNode() ? new AtomicReference<ClientRemapFuture>() : null; } /** {@inheritDoc} */ @@ -158,8 +167,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null) tx.onOptimisticException(nodeId); - if (err.compareAndSet(null, e)) - tx.setRollbackOnly(); + err.compareAndSet(null, e); } /** @@ -183,7 +191,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -230,6 +238,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre tx.state(PREPARED); if (super.onDone(tx, err0)) { + if (err0 != null) + tx.setRollbackOnly(); + // Don't forget to clean up. cctx.mvcc().removeFuture(this); @@ -331,18 +342,18 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } else { topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override - public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override - public void run() { + @Override public void run() { try { fut.get(); prepareOnTopology(remap, c); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { onDone(e); - } finally { + } + finally { cctx.txContextReset(); } } @@ -435,7 +446,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return; } - prepare(tx.readEntries(), tx.writeEntries()); + prepare(tx.readEntries(), tx.writeEntries(), remap); markInitialized(); } @@ -447,11 +458,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** * @param reads Read entries. * @param writes Write entries. + * @param remap Remap flag. * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("unchecked") private void prepare( Iterable<IgniteTxEntry> reads, - Iterable<IgniteTxEntry> writes + Iterable<IgniteTxEntry> writes, + boolean remap ) throws IgniteCheckedException { AffinityTopologyVersion topVer = tx.topologyVersion(); @@ -496,19 +510,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre checkOnePhase(); for (GridDistributedTxMapping m : mappings.values()) { - if (!prepare(m)) + assert !m.empty(); + + MiniFuture fut = new MiniFuture(m); + + add(fut); + } + + Collection<MiniFuture> futs = (Collection)futures(); + + for (MiniFuture fut : futs) { + if (remap && fut.rcvRes.get()) + continue; + + IgniteCheckedException err = prepare(fut); + + if (err != null) { + onDone(err); + break; + } } markInitialized(); } /** - * @param m Mapping. + * @param fut Mini future. * @return {@code False} if should stop mapping. */ - private boolean prepare(GridDistributedTxMapping m) { - assert !m.empty(); + private IgniteCheckedException prepare(final MiniFuture fut) { + GridDistributedTxMapping m = fut.mapping(); final ClusterNode n = m.node(); @@ -545,16 +577,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre catch (IgniteCheckedException e) { onError(m.node().id(), e); - return false; + return e; } } - final MiniFuture fut = new MiniFuture(m); - req.miniId(fut.futureId()); - add(fut); // Append new future. - // If this is the primary node for the keys. if (n.isLocal()) { IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); @@ -579,16 +607,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre fut.onResult(e); - return false; + return e; } catch (IgniteCheckedException e) { fut.onResult(e); - return false; + return e; } } - return true; + return null; } /** @@ -701,7 +729,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** * */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> { + /** */ + private boolean remap = true; + + /** + * + */ + public ClientRemapFuture() { + super(); + + reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() { + @Override public boolean collect(GridNearTxPrepareResponse res) { + assert res != null; + + if (res.clientRemapVersion() == null) + remap = false; + + return true; + } + + @Override public Boolean reduce() { + return remap; + } + }); + } + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; @@ -782,7 +840,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre * @param nodeId Failed node ID. * @param res Result callback. */ - void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { if (isDone()) return; @@ -798,42 +856,93 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre assert cctx.kernalContext().clientNode(); assert m.clientFirst(); - IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + tx.onClientRemap(m.node().id()); - if (affFut != null && !affFut.isDone()) { - affFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - try { - fut.get(); + ClientRemapFuture remapFut = new ClientRemapFuture(); + + if (remapFutRef.compareAndSet(null, remapFut)) { + Collection<MiniFuture> futs = (Collection)futures(); - remap(); + for (MiniFuture fut : futs) { + if (fut != this) + remapFut.add(fut); + } + + remapFut.markInitialized(); + + remapFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> remapFut) { + try { + IgniteInternalFuture<?> affFut = + cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut == null) + affFut = new GridFinishedFuture<Object>(); + + if (remapFut.get()) { + if (log.isDebugEnabled()) { + log.debug("Will remap client tx [" + + "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this + + ", topVer=" + res.topologyVersion() + ']'); + } + + boolean set = remapFutRef.compareAndSet((ClientRemapFuture)remapFut, null); + + assert set; + + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> affFut) { + try { + affFut.get(); + + remap(res); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Cluster topology changed while client transaction is preparing."); + + err.retryReadyFuture(affFut); + + onDone(err); + } } catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) { + log.debug("Prepare failed, will not remap tx: " + + GridNearOptimisticSerializableTxPrepareFuture.this); + } + onDone(e); } } }); } else - remap(); + onDone(res); } else { onPrepareResponse(m, res); // Finish this mini future. - onDone(tx); + onDone(res); } } } } /** - * + * @param res Response. */ - private void remap() { + private void remap(final GridNearTxPrepareResponse res) { prepareOnTopology(true, new Runnable() { @Override public void run() { - onDone(tx); + onDone(res); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 4a0caa9..0646aac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -199,7 +199,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -740,7 +740,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; @@ -827,7 +827,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param nodeId Failed node ID. * @param res Result callback. */ - void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { if (isDone()) return; @@ -849,7 +849,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd try { fut.get(); - remap(); + remap(res); } catch (IgniteCheckedException e) { onDone(e); @@ -858,7 +858,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd }); } else - remap(); + remap(res); } else { onPrepareResponse(m, res); @@ -868,19 +868,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd proceedPrepare(mappings); // Finish this mini future. - onDone(tx); + onDone(res); } } } } /** - * + * @param res Response. */ - private void remap() { + private void remap(final GridNearTxPrepareResponse res) { prepareOnTopology(true, new Runnable() { @Override public void run() { - onDone(tx); + onDone(res); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index b8d2250..3da4340 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -103,7 +103,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (!isDone()) { assert res.clientRemapVersion() == null : res; - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { MiniFuture f = (MiniFuture)fut; if (f.futureId().equals(res.miniId())) { @@ -291,7 +291,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** * */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; @@ -331,7 +331,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else { onPrepareResponse(m, res); - onDone(tx); + onDone(res); } } @@ -343,7 +343,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.markForBackupCheck(); // Do not fail future for one-phase transaction right away. - onDone(tx); + onDone((GridNearTxPrepareResponse)null); } onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 721de47..bebd9d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -595,6 +595,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** + * @param nodeId Primary node id. + */ + void onClientRemap(UUID nodeId) { + mappings.remove(nodeId); + } + + /** * @param nodeId Node ID to mark with explicit lock. * @return {@code True} if mapping was found. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index fac7a12..c9ea42a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -48,15 +48,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO /** * Common code for tx prepare in optimistic and pessimistic modes. */ -public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx> +public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheFuture<IgniteInternalTx> { /** Logger reference. */ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** */ - private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER = - new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { - @Override public boolean collect(IgniteInternalTx e) { + private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER = + new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() { + @Override public boolean collect(GridNearTxPrepareResponse e) { return true; } @@ -94,7 +94,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit * @param tx Transaction. */ public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) { - super(cctx.kernalContext(), REDUCER); + super(REDUCER); assert cctx != null; assert tx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java new file mode 100644 index 0000000..68899e7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionOptimisticException; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void _testTxRollbackRead1() throws Exception { + txRollbackRead(true); + } + + /** + * @throws Exception If failed. + */ + public void _testTxRollbackRead2() throws Exception { + txRollbackRead(false); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @throws Exception If failed. + */ + private void txRollbackRead(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = new ArrayList<>(); + + keys.add(nearKey(cache)); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + assertEquals(1, (Object) cache.get(key)); + } + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void _testTxRollbackReadWrite() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + final IgniteCache<Integer, Integer> cache = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final Integer key = nearKey(cache); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + updateKey(cache, key, 1); + + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + tx.commit(); + } + + assertEquals(2, (Object) cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked1() throws Exception { + Ignite ignite0 = ignite(0); + + IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + final Integer key = nearKey(cache); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = lockKey(latch, cache, key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + tx.commit(); + } + + assertEquals(2, (Object)cache.get(key)); + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked2() throws Exception { + rollbackIfLockedPartialLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked3() throws Exception { + rollbackIfLockedPartialLock(true); + } + + /** + * @param locKey If {@code true} gets lock for local key. + * @throws Exception If failed. + */ + public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); + final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = lockKey(latch, cache, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit2"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + assertEquals(1, (Object) cache.get(key1)); + assertNull(cache.get(key2)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit3"); + + tx.commit(); + } + + assertEquals(2, (Object) cache.get(key2)); + assertEquals(2, (Object) cache.get(key2)); + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlock() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClients() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, true); + } + + /** + * @return Client nodes. + */ + private List<Ignite> clients() { + List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = ignite(SRVS + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + return clients; + } + + /** + * @param updateNodes Nodes executing updates. + * @param threads Number of threads executing updates. + * @param restart If {@code true} restarts one node. + * @throws Exception If failed. + */ + private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, + int threads, + final boolean restart) throws Exception { + assert updateNodes.size() > 0; + + final Ignite ignite0 = ignite(0); + + final String cacheName = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + try { + final int KEYS = 100; + + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = null; + + try { + if (restart) { + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + Ignite ignite = startGrid(0); + + assertFalse(ignite.configuration().isClientMode()); + } + + return null; + } + }); + } + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + final long stopTime = U.currentTimeMillis() + 10_000; + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement() % updateNodes.size(); + + Ignite node = updateNodes.get(nodeIdx); + + log.info("Tx thread: " + node.name()); + + final IgniteTransactions txs = node.transactions(); + + final IgniteCache<Integer, Integer> cache = node.cache(cacheName); + + assertNotNull(cache); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) { + final Map<Integer, Integer> keys = new LinkedHashMap<>(); + + for (int i = 0; i < KEYS / 2; i++) + keys.put(rnd.nextInt(KEYS), rnd.nextInt()); + + try { + if (restart) { + doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(keys); + + return null; + } + }); + } + else { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.putAll(keys); + + tx.commit(); + } + } + } + catch (TransactionOptimisticException ignore) { + // No-op. + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + + return null; + } + }, threads, "tx-thread"); + + updateFut.get(60, SECONDS); + + IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); + + for (int key = 0; key < KEYS; key++) { + Integer val = cache.get(key); + + for (int node = 1; node < SRVS + CLIENTS; node++) + assertEquals(val, ignite(node).cache(cache.getName()).get(key)); + } + } + + finished.set(true); + + if (fut != null) + fut.get(); + } + finally { + finished.set(true); + } + } + finally { + ignite(1).destroyCache(cacheName); + } + } + + /** + * @return Cache configurations. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + // No store, no near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false)); + + // Store, no near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false)); + + // No store, near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true)); + + // Store, near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true)); + + return ccfgs; + } + + /** + * @param ccfg Cache configuration. + */ + private void logCacheInfo(CacheConfiguration<?, ?> ccfg) { + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", sync=" + ccfg.getWriteSynchronizationMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + (ccfg.getNearConfiguration() != null) + + ", store=" + ccfg.isWriteThrough() + ']'); + } + + /** + * @param cache Cache. + * @param key Key. + * @param val Value. + * @throws Exception If failed. + */ + private void updateKey( + final IgniteCache<Integer, Integer> cache, + final Integer key, + final Integer val) throws Exception { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, val); + + tx.commit(); + } + + return null; + } + }, "update-thread"); + + fut.get(); + } + + /** + * @param releaseLatch Release lock latch. + * @param cache Cache. + * @param key Key. + * @return Future. + * @throws Exception If failed. + */ + private IgniteInternalFuture<?> lockKey( + final CountDownLatch releaseLatch, + final IgniteCache<Integer, Integer> cache, + final Integer key) throws Exception { + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, 1); + + log.info("Locked key: " + key); + + lockLatch.countDown(); + + assertTrue(releaseLatch.await(100000, SECONDS)); + + log.info("Commit tx: " + key); + + tx.commit(); + } + + return null; + } + }, "lock-thread"); + + assertTrue(lockLatch.await(10, SECONDS)); + + return fut; + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param storeEnabled If {@code true} adds cache store. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean storeEnabled, + boolean nearCache) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(backups); + ccfg.setWriteSynchronizationMode(syncMode); + + if (storeEnabled) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheStore<Integer, Integer> create() { + return new CacheStoreAdapter<Integer, Integer>() { + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + // No-op. + } + + @Override public void delete(Object key) { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java deleted file mode 100644 index 217e362..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java +++ /dev/null @@ -1,687 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; -import javax.cache.configuration.Factory; -import javax.cache.integration.CacheLoaderException; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreAdapter; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionOptimisticException; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; - -/** - * - */ -public class CacheSerializableTxTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int SRVS = 4; - - /** */ - private static final int CLIENTS = 3; - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - - cfg.setClientMode(client); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(SRVS); - - client = true; - - startGridsMultiThreaded(SRVS, CLIENTS); - - client = false; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackRead1() throws Exception { - txRollbackRead(true); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackRead2() throws Exception { - txRollbackRead(false); - } - - /** - * @param noVal If {@code true} there is no cache value when read in tx. - * @throws Exception If failed. - */ - private void txRollbackRead(boolean noVal) throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); - - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { - logCacheInfo(ccfg); - - try { - IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - - List<Integer> keys = new ArrayList<>(); - - keys.add(nearKey(cache)); - - for (Integer key : keys) { - log.info("Test key: " + key); - - Integer expVal = null; - - if (!noVal) { - expVal = -1; - - cache.put(key, expVal); - } - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); - - assertEquals(expVal, val); - - updateKey(cache, key, 1); - - log.info("Commit"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - assertEquals(1, (Object) cache.get(key)); - } - } - finally { - ignite0.destroyCache(ccfg.getName()); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackReadWrite() throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); - - final IgniteCache<Integer, Integer> cache = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); - - final Integer key = nearKey(cache); - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); - - assertNull(val); - - updateKey(cache, key, 1); - - cache.put(key, 2); - - log.info("Commit"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - assertEquals(1, (Object)cache.get(key)); - - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); - - tx.commit(); - } - - assertEquals(2, (Object) cache.get(key)); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked1() throws Exception { - Ignite ignite0 = ignite(0); - - IgniteTransactions txs = ignite0.transactions(); - - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { - logCacheInfo(ccfg); - - try { - IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - - final Integer key = nearKey(cache); - - CountDownLatch latch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = lockKey(latch, cache, key); - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); - - log.info("Commit"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - latch.countDown(); - - fut.get(); - - assertEquals(1, (Object)cache.get(key)); - - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); - - tx.commit(); - } - - assertEquals(2, (Object)cache.get(key)); - } - finally { - ignite0.destroyCache(ccfg.getName()); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked2() throws Exception { - rollbackIfLockedPartialLock(false); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked3() throws Exception { - rollbackIfLockedPartialLock(true); - } - - /** - * @param locKey If {@code true} gets lock for local key. - * @throws Exception If failed. - */ - public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); - - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { - logCacheInfo(ccfg); - - try { - IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - - final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); - final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); - - CountDownLatch latch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = lockKey(latch, cache, key1); - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key1, 2); - cache.put(key2, 2); - - log.info("Commit2"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - latch.countDown(); - - fut.get(); - - assertEquals(1, (Object) cache.get(key1)); - assertNull(cache.get(key2)); - - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key1, 2); - cache.put(key2, 2); - - log.info("Commit3"); - - tx.commit(); - } - - assertEquals(2, (Object) cache.get(key2)); - assertEquals(2, (Object) cache.get(key2)); - } - finally { - ignite0.destroyCache(ccfg.getName()); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlock() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockClients() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, false); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, true); - } - - /** - * @return Client nodes. - */ - private List<Ignite> clients() { - List<Ignite> clients = new ArrayList<>(); - - for (int i = 0; i < CLIENTS; i++) { - Ignite ignite = ignite(SRVS + i); - - assertTrue(ignite.configuration().isClientMode()); - - clients.add(ignite); - } - - return clients; - } - - /** - * @param updateNodes Nodes executing updates. - * @param threads Number of threads executing updates. - * @param restart If {@code true} restarts one node. - * @throws Exception If failed. - */ - private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, - int threads, - final boolean restart) throws Exception { - assert updateNodes.size() > 0; - - final Ignite ignite0 = ignite(0); - - final String cacheName = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); - - final int KEYS = 100; - - final AtomicBoolean finished = new AtomicBoolean(); - - IgniteInternalFuture<Object> fut = null; - - try { - if (restart) { - fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finished.get()) { - stopGrid(0); - - U.sleep(300); - - Ignite ignite = startGrid(0); - - assertFalse(ignite.configuration().isClientMode()); - } - - return null; - } - }); - } - - for (int i = 0; i < 10; i++) { - log.info("Iteration: " + i); - - final long stopTime = U.currentTimeMillis() + 5_000; - - final AtomicInteger idx = new AtomicInteger(); - - IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - int nodeIdx = idx.getAndIncrement() % updateNodes.size(); - - Ignite node = updateNodes.get(nodeIdx); - - log.info("Tx thread: " + node.name()); - - final IgniteTransactions txs = node.transactions(); - - final IgniteCache<Integer, Integer> cache = node.cache(cacheName); - - assertNotNull(cache); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - while (U.currentTimeMillis() < stopTime) { - final Map<Integer, Integer> keys = new LinkedHashMap<>(); - - for (int i = 0; i < KEYS / 2; i++) - keys.put(rnd.nextInt(KEYS), rnd.nextInt()); - - try { - if (restart) { - doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() { - @Override public Void call() throws Exception { - cache.putAll(keys); - - return null; - } - }); - } - else { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.putAll(keys); - - tx.commit(); - } - } - } - catch (TransactionOptimisticException ignore) { - // No-op. - } - catch (Throwable e) { - log.error("Unexpected error: " + e, e); - - throw e; - } - } - - return null; - } - }, threads, "tx-thread"); - - updateFut.get(60, SECONDS); - - IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); - - for (int key = 0; key < KEYS; key++) { - Integer val = cache.get(key); - - for (int node = 1; node < SRVS + CLIENTS; node++) - assertEquals(val, ignite(node).cache(cache.getName()).get(key)); - } - } - - finished.set(true); - - if (fut != null) - fut.get(); - } - finally { - finished.set(true); - } - } - - /** - * @return Cache configurations. - */ - private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { - List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); - - // No store, no near. - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false)); - - // Store, no near. - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false)); - - // No store, near. - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true)); - - // Store, near. - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true)); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true)); - - return ccfgs; - } - - /** - * @param ccfg Cache configuration. - */ - private void logCacheInfo(CacheConfiguration<?, ?> ccfg) { - log.info("Test cache [mode=" + ccfg.getCacheMode() + - ", sync=" + ccfg.getWriteSynchronizationMode() + - ", backups=" + ccfg.getBackups() + - ", near=" + (ccfg.getNearConfiguration() != null) + - ", store=" + ccfg.isWriteThrough() + ']'); - } - - /** - * @param cache Cache. - * @param key Key. - * @param val Value. - * @throws Exception If failed. - */ - private void updateKey( - final IgniteCache<Integer, Integer> cache, - final Integer key, - final Integer val) throws Exception { - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, val); - - tx.commit(); - } - - return null; - } - }, "update-thread"); - - fut.get(); - } - - /** - * @param releaseLatch Release lock latch. - * @param cache Cache. - * @param key Key. - * @return Future. - * @throws Exception If failed. - */ - private IgniteInternalFuture<?> lockKey( - final CountDownLatch releaseLatch, - final IgniteCache<Integer, Integer> cache, - final Integer key) throws Exception { - final CountDownLatch lockLatch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, 1); - - log.info("Locked key: " + key); - - lockLatch.countDown(); - - assertTrue(releaseLatch.await(100000, SECONDS)); - - log.info("Commit tx: " + key); - - tx.commit(); - } - - return null; - } - }, "lock-thread"); - - assertTrue(lockLatch.await(10, SECONDS)); - - return fut; - } - - /** - * @param cacheMode Cache mode. - * @param syncMode Write synchronization mode. - * @param backups Number of backups. - * @param storeEnabled If {@code true} adds cache store. - * @param nearCache If {@code true} near cache is enabled. - * @return Cache configuration. - */ - private CacheConfiguration<Integer, Integer> cacheConfiguration( - CacheMode cacheMode, - CacheWriteSynchronizationMode syncMode, - int backups, - boolean storeEnabled, - boolean nearCache) { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); - - ccfg.setCacheMode(cacheMode); - ccfg.setAtomicityMode(TRANSACTIONAL); - ccfg.setBackups(backups); - ccfg.setWriteSynchronizationMode(syncMode); - - if (storeEnabled) { - ccfg.setCacheStoreFactory(new TestStoreFactory()); - ccfg.setWriteThrough(true); - } - - if (nearCache) - ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); - - return ccfg; - } - - /** - * - */ - private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { - /** {@inheritDoc} */ - @Override public CacheStore<Integer, Integer> create() { - return new CacheStoreAdapter<Integer, Integer>() { - @Override public Integer load(Integer key) throws CacheLoaderException { - return null; - } - - @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { - // No-op. - } - - @Override public void delete(Object key) { - // No-op. - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 2d29c49..3cdca6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -93,6 +93,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * @@ -862,6 +863,150 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ + public void testOptimisticSerializableTx() throws Exception { + optimisticSerializableTx(null); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticSerializableTxNearEnabled() throws Exception { + optimisticSerializableTx(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void optimisticSerializableTx(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id()); + + spi.record(GridNearTxPrepareRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 4); + + List<Object> msgs = spi.recordedMessages(); + + for (Object msg : msgs) + assertTrue(((GridNearTxPrepareRequest)msg).firstClientRequest()); + + assertEquals(5, msgs.size()); + + ignite3.close(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id()); + + spi.record(GridNearTxPrepareRequest.class); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + ignite3 = startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + msgs = spi.recordedMessages(); + + for (Object msg : msgs) + assertTrue(((GridNearTxPrepareRequest)msg).firstClientRequest()); + + assertEquals(5, msgs.size()); + + checkData(map, null, cache, 4); + + for (int i = 0; i < 100; i++) + map.put(i, i + 2); + + try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 4); + } + + /** + * @throws Exception If failed. + */ public void testLock() throws Exception { lock(null); } @@ -1816,6 +1961,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac super.sendMessage(msg.get1(), msg.get2()); } + + blockedMsgs.clear(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index b89bffd..dbf8928 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheReadThroughReplicatedAto import org.apache.ignite.internal.processors.cache.CacheReadThroughReplicatedRestartSelfTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughRestartSelfTest; import org.apache.ignite.internal.processors.cache.CacheRemoveAllSelfTest; +import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; import org.apache.ignite.internal.processors.cache.CacheStopAndDestroySelfTest; import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynamicStartAtomicTest; import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynamicStartTxTest; @@ -139,6 +140,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("IgniteCache Test Suite part 4"); + suite.addTestSuite(CacheSerializableTransactionsTest.class); + // Multi node update. suite.addTestSuite(GridCacheMultinodeUpdateSelfTest.class); suite.addTestSuite(GridCacheMultinodeUpdateNearEnabledSelfTest.class);
