Repository: ignite Updated Branches: refs/heads/master c10be5780 -> 14f958b43
ignite-5283 Fix of transaction recovery on backup when primary node failed Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14f958b4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14f958b4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14f958b4 Branch: refs/heads/master Commit: 14f958b43c789d2fa7c02914a1176e4df16b6a82 Parents: c10be57 Author: agura <[email protected]> Authored: Thu Jun 15 18:42:17 2017 +0300 Committer: agura <[email protected]> Committed: Thu Jun 15 18:45:17 2017 +0300 ---------------------------------------------------------------------- .../distributed/GridDistributedLockRequest.java | 20 ++ .../GridDistributedTxPrepareRequest.java | 20 ++ .../distributed/dht/GridDhtLockFuture.java | 1 + .../distributed/dht/GridDhtLockRequest.java | 4 + .../dht/GridDhtTransactionalCacheAdapter.java | 3 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 + .../dht/GridDhtTxPrepareRequest.java | 5 +- .../cache/distributed/dht/GridDhtTxRemote.java | 24 +- .../cache/transactions/IgniteTxHandler.java | 3 +- .../IgniteTxCachePrimarySyncTest.java | 34 ++- .../IgniteCachePutRetryAbstractSelfTest.java | 37 +-- .../dht/TxRecoveryStoreEnabledTest.java | 231 +++++++++++++++++++ .../IgniteCacheTxRecoverySelfTestSuite.java | 2 + 13 files changed, 351 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 74f34a9..25a557c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -51,6 +51,9 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** Keep binary flag. */ private static final int KEEP_BINARY_FLAG_MASK = 0x02; + /** */ + private static final int STORE_USED_FLAG_MASK = 0x04; + /** Sender node ID. */ private UUID nodeId; @@ -252,6 +255,23 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { } /** + * @return Flag indicating whether transaction use cache store. + */ + public boolean storeUsed() { + return (flags & STORE_USED_FLAG_MASK) != 0; + } + + /** + * @param storeUsed Store used value. + */ + public void storeUsed(boolean storeUsed) { + if (storeUsed) + flags = (byte)(flags | STORE_USED_FLAG_MASK); + else + flags &= ~STORE_USED_FLAG_MASK; + } + + /** * @return Transaction isolation or <tt>null</tt> if not in transaction. */ public TransactionIsolation isolation() { http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 3205c58..91dcd9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -75,6 +75,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** */ private static final int SYSTEM_TX_FLAG_MASK = 0x10; + /** */ + public static final int STORE_WRITE_THROUGH_FLAG_MASK = 0x20; + /** Collection to message converter. */ private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) { @@ -233,6 +236,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } /** + * @return Flag indicating whether transaction use cache store. + */ + public boolean storeWriteThrough() { + return (flags & STORE_WRITE_THROUGH_FLAG_MASK) != 0; + } + + /** + * @param storeWriteThrough Store write through value. + */ + public void storeWriteThrough(boolean storeWriteThrough) { + if (storeWriteThrough) + flags = (byte)(flags | STORE_WRITE_THROUGH_FLAG_MASK); + else + flags &= ~STORE_WRITE_THROUGH_FLAG_MASK; + } + + /** * @return IO policy. */ public byte policy() { http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 388a434..5e62601 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -873,6 +873,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, skipStore, + cctx.store().configured(), keepBinary, cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 1f81764..1ac5818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -120,6 +120,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param storeUsed Cache store used flag. * @param keepBinary Keep binary flag. * @param addDepInfo Deployment info flag. */ @@ -144,6 +145,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { int taskNameHash, long accessTtl, boolean skipStore, + boolean storeUsed, boolean keepBinary, boolean addDepInfo ) { @@ -166,6 +168,8 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { this.topVer = topVer; + storeUsed(storeUsed); + nearKeys = nearCnt == 0 ? Collections.<KeyCacheObject>emptyList() : new ArrayList<KeyCacheObject>(nearCnt); invalidateEntries = new BitSet(dhtCnt == 0 ? nearCnt : dhtCnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index acba833..5d31581 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -255,7 +255,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.timeout(), req.txSize(), req.subjectId(), - req.taskNameHash()); + req.taskNameHash(), + !req.skipStore() && req.storeUsed()); tx = ctx.tm().onCreated(null, tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 5b4b10f..6ed4781 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1289,6 +1289,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.subjectId(), tx.taskNameHash(), tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), retVal); int idx = 0; @@ -1401,6 +1402,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.subjectId(), tx.taskNameHash(), tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), retVal); for (IgniteTxEntry entry : nearMapping.entries()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 631fe10..d334850 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -121,8 +121,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param txNodes Transaction nodes mapping. * @param nearXidVer Near transaction ID. * @param last {@code True} if this is last prepare request for node. - * @param retVal Need return value flag. * @param addDepInfo Deployment info flag. + * @param storeWriteThrough Cache store write through flag. + * @param retVal Need return value flag. */ public GridDhtTxPrepareRequest( IgniteUuid futId, @@ -139,6 +140,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { UUID subjId, int taskNameHash, boolean addDepInfo, + boolean storeWriteThrough, boolean retVal) { super(tx, timeout, @@ -161,6 +163,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { this.subjId = subjId; this.taskNameHash = taskNameHash; + storeWriteThrough(storeWriteThrough); needReturnValue(retVal); invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f3c4963..4373cda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -64,6 +64,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** Near transaction ID. */ private GridCacheVersion nearXidVer; + /** Store write through flag. */ + private boolean storeWriteThrough; + /** * Empty constructor required for {@link Externalizable}. */ @@ -74,6 +77,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** * This constructor is meant for optimistic transactions. * + * @param ctx Cache context. * @param nearNodeId Near node ID. * @param rmtFutId Remote future ID. * @param nodeId Node ID. @@ -85,10 +89,10 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param isolation Transaction isolation. * @param invalidate Invalidate flag. * @param timeout Timeout. - * @param ctx Cache context. * @param txSize Expected transaction size. * @param nearXidVer Near transaction ID. * @param txNodes Transaction nodes mapping. + * @param storeWriteThrough Cache store write through flag. */ public GridDhtTxRemote( GridCacheSharedContext ctx, @@ -109,8 +113,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { Map<UUID, Collection<UUID>> txNodes, @Nullable UUID subjId, int taskNameHash, - boolean single - ) { + boolean single, + boolean storeWriteThrough) { super( ctx, nodeId, @@ -134,6 +138,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.rmtFutId = rmtFutId; this.nearXidVer = nearXidVer; this.txNodes = txNodes; + this.storeWriteThrough = storeWriteThrough; txState = single ? new IgniteTxRemoteSingleStateImpl() : new IgniteTxRemoteStateImpl( @@ -148,6 +153,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** * This constructor is meant for pessimistic transactions. * + * @param ctx Cache context. * @param nearNodeId Near node ID. * @param rmtFutId Remote future ID. * @param nodeId Node ID. @@ -160,8 +166,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param isolation Transaction isolation. * @param invalidate Invalidate flag. * @param timeout Timeout. - * @param ctx Cache context. * @param txSize Expected transaction size. + * @param storeWriteThrough Cache store write through flag. */ public GridDhtTxRemote( GridCacheSharedContext ctx, @@ -180,8 +186,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { long timeout, int txSize, @Nullable UUID subjId, - int taskNameHash - ) { + int taskNameHash, + boolean storeWriteThrough) { super( ctx, nodeId, @@ -204,6 +210,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.nearXidVer = nearXidVer; this.nearNodeId = nearNodeId; this.rmtFutId = rmtFutId; + this.storeWriteThrough = storeWriteThrough; txState = new IgniteTxRemoteStateImpl( Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), @@ -227,6 +234,11 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ + @Override public boolean storeWriteThrough() { + return storeWriteThrough; + } + + /** {@inheritDoc} */ @Override public UUID eventNodeId() { return nearNodeId(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index b958a27..7efd6ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1437,7 +1437,8 @@ public class IgniteTxHandler { req.transactionNodes(), req.subjectId(), req.taskNameHash(), - single); + single, + req.storeWriteThrough()); tx.writeVersion(req.writeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java index 55d7fe4..1fcbd21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; @@ -1103,19 +1104,30 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public CacheStore<Object, Object> create() { - return new CacheStoreAdapter() { - @Override public Object load(Object key) throws CacheLoaderException { - return null; - } + return new TestCacheStore(); + } + } - @Override public void write(Cache.Entry entry) throws CacheWriterException { - // No-op. - } + /** + * + */ + private static class TestCacheStore extends CacheStoreAdapter<Object, Object> { + /** Store map. */ + private static final Map STORE_MAP = new ConcurrentHashMap(); - @Override public void delete(Object key) throws CacheWriterException { - // No-op. - } - }; + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return STORE_MAP.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + STORE_MAP.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + STORE_MAP.remove(key); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index d9b9663..e65bff4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.Cache; import javax.cache.configuration.Factory; @@ -45,7 +46,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -58,7 +58,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** @@ -599,25 +598,33 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst } } + private static class TestStoreFactory implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestCacheStore(); + } + } + /** * */ - private static class TestStoreFactory implements Factory<CacheStore> { + private static class TestCacheStore extends CacheStoreAdapter { + /** Store map. */ + private static Map STORE_MAP = new ConcurrentHashMap(); + /** {@inheritDoc} */ - @Override public CacheStore create() { - return new CacheStoreAdapter() { - @Override public Object load(Object key) throws CacheLoaderException { - return null; - } + @Override public Object load(Object key) throws CacheLoaderException { + return STORE_MAP.get(key); + } - @Override public void write(Cache.Entry entry) throws CacheWriterException { - // No-op. - } + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + STORE_MAP.put(entry.getKey(), entry.getValue()); + } - @Override public void delete(Object key) throws CacheWriterException { - // No-op. - } - }; + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + STORE_MAP.remove(key); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java new file mode 100644 index 0000000..7b350c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.concurrent.CountDownLatch; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; + +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** + * + */ +public class TxRecoveryStoreEnabledTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 2; + + /** CAche name. */ + public static final String CACHE_NAME = "cache"; + + /** Latch. */ + private static CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + cfg.setDiscoverySpi(new TestDiscoverySpi()); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setNearConfiguration(null); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setCacheStoreFactory(new TestCacheStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setWriteBehindEnabled(false); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch = new CountDownLatch(1); + + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testOptimistic() throws Exception { + checkTxRecovery(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testPessimistic() throws Exception { + checkTxRecovery(PESSIMISTIC); + } + + /** + * @throws Exception If failed. + */ + private void checkTxRecovery(TransactionConcurrency concurrency) throws Exception { + final Ignite node0 = ignite(0); + Ignite node1 = ignite(1); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + try { + latch.await(); + + IgniteConfiguration cfg = node0.configuration(); + + ((TestCommunicationSpi)cfg.getCommunicationSpi()).block(); + ((TestDiscoverySpi)cfg.getDiscoverySpi()).simulateNodeFailure(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }, 1); + + IgniteCache<Object, Object> cache0 = node0.cache(CACHE_NAME); + + Integer key = primaryKey(cache0); + + try (Transaction tx = node0.transactions().txStart(concurrency, READ_COMMITTED)) { + cache0.put(key, key); + + tx.commit(); + } + catch (Exception e) { + // No-op. + } + + fut.get(); + + IgniteCache<Object, Object> cache1 = node1.cache(CACHE_NAME); + + assertNull(cache1.get(key)); + } + + /** + * + */ + private static class TestCacheStoreFactory implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestCacheStore(); + } + } + + /** + * + */ + private static class TestCacheStore extends CacheStoreAdapter<Integer, Integer> { + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + if (latch.getCount() > 0) { // Need wait only on primary node. + latch.countDown(); + + try { + U.sleep(3000); + } + catch (IgniteInterruptedCheckedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // no-op. + } + } + + /** + * + */ + private static class TestDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void simulateNodeFailure() { + super.simulateNodeFailure(); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Block. */ + private volatile boolean block; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (!block) + super.sendMessage(node, msg); + } + + /** + * + */ + private void block() { + block = true; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14f958b4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java index 7bd7797..a2c6c83 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePa import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTxRecoveryRollbackTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.TxRecoveryStoreEnabledTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxOriginatingNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest; @@ -56,6 +57,7 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest.class); suite.addTestSuite(IgniteCacheTxRecoveryRollbackTest.class); + suite.addTestSuite(TxRecoveryStoreEnabledTest.class); return suite; }
