Repository: ignite Updated Branches: refs/heads/ignite-2.5 44401882d -> 111592e2c
IGNITE-8435 StorageException is handled like NodeStoppingException during failing transaction commit Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/111592e2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/111592e2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/111592e2 Branch: refs/heads/ignite-2.5 Commit: 111592e2ccd3af466433f78626c1d06d4b5ece4c Parents: 4440188 Author: Anton Kalashnikov <[email protected]> Authored: Tue May 8 16:35:05 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Tue May 8 17:02:38 2018 +0300 ---------------------------------------------------------------------- .../internal/InvalidEnvironmentException.java | 25 ++ .../ignite/internal/NodeStoppingException.java | 2 +- .../internal/pagemem/wal/StorageException.java | 3 +- .../GridDistributedTxRemoteAdapter.java | 8 +- .../distributed/dht/GridDhtTxFinishFuture.java | 8 +- .../transactions/IgniteTxLocalAdapter.java | 8 +- .../apache/ignite/internal/util/typedef/X.java | 4 +- .../failure/AccountTransferTransactionTest.java | 331 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 9 files changed, 375 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java b/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java new file mode 100644 index 0000000..d45a443 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +/** + * Marker interface of invalid environment exception. + */ +public interface InvalidEnvironmentException { + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java index 75447a1..cc39b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java @@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCheckedException; /** * */ -public class NodeStoppingException extends IgniteCheckedException { +public class NodeStoppingException extends IgniteCheckedException implements InvalidEnvironmentException { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java index 2da08b9..debc391 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java @@ -19,12 +19,13 @@ package org.apache.ignite.internal.pagemem.wal; import java.io.IOException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.InvalidEnvironmentException; import org.jetbrains.annotations.NotNull; /** * Exception is needed to distinguish WAL manager & page store critical I/O errors. */ -public class StorageException extends IgniteCheckedException { +public class StorageException extends IgniteCheckedException implements InvalidEnvironmentException { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index a692b2e..5e3111c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -28,8 +28,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.InvalidEnvironmentException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -734,21 +734,21 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } catch (Throwable ex) { - boolean nodeStopping = X.hasCause(ex, NodeStoppingException.class); + boolean hasIOIssue = X.hasCause(ex, InvalidEnvironmentException.class); // In case of error, we still make the best effort to commit, // as there is no way to rollback at this point. err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + "(all transaction entries will be invalidated): " + CU.txString(this), ex); - if (nodeStopping) { + if (hasIOIssue) { U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this + ", err=" + ex + ']'); } else U.error(log, "Commit failed.", err); - uncommit(nodeStopping); + uncommit(hasIOIssue); state(UNKNOWN); http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 6380710..0ed8419 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -26,10 +26,10 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.InvalidEnvironmentException; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheFuture; @@ -168,7 +168,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (ERR_UPD.compareAndSet(this, null, e)) { tx.setRollbackOnly(); - if (X.hasCause(e, NodeStoppingException.class)) + if (X.hasCause(e, InvalidEnvironmentException.class)) onComplete(); else finish(false); @@ -225,9 +225,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) { try { - boolean nodeStop = err != null && X.hasCause(err, NodeStoppingException.class); + boolean hasInvalidEnvironmentIssue = X.hasCause(err, InvalidEnvironmentException.class); - this.tx.tmFinish(err == null, nodeStop, false); + this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, false); } catch (IgniteCheckedException finishErr) { U.error(log, "Failed to finish tx: " + tx, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 61650cc..6f11a57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -30,8 +30,8 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.InvalidEnvironmentException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -846,13 +846,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig throw ex; } else { - boolean nodeStopping = X.hasCause(ex, NodeStoppingException.class); + boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class); IgniteCheckedException err = new IgniteTxHeuristicCheckedException("Failed to locally write to cache " + "(all transaction entries will be invalidated, however there was a window when " + "entries for this transaction were visible to others): " + this, ex); - if (nodeStopping) { + if (hasInvalidEnvironmentIssue) { U.warn(log, "Failed to commit transaction, node is stopping " + "[tx=" + this + ", err=" + ex + ']'); } @@ -865,7 +865,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig try { // Courtesy to minimize damage. - uncommit(nodeStopping); + uncommit(hasInvalidEnvironmentIssue); } catch (Throwable ex1) { U.error(log, "Failed to uncommit transaction: " + this, ex1); http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java index 1a43daa..49732b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java @@ -433,14 +433,14 @@ public final class X { * {@code false} otherwise. */ @SafeVarargs - public static boolean hasCause(@Nullable Throwable t, @Nullable Class<? extends Throwable>... cls) { + public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls) { if (t == null || F.isEmpty(cls)) return false; assert cls != null; for (Throwable th = t; th != null; th = th.getCause()) { - for (Class<? extends Throwable> c : cls) { + for (Class<?> c : cls) { if (c.isAssignableFrom(th.getClass())) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java new file mode 100644 index 0000000..8d7cf15 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl; +import org.apache.ignite.mxbean.WorkersControlMXBean; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}. + */ +public class AccountTransferTransactionTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Count of accounts in one thread. */ + private static final int ACCOUNTS_CNT = 20; + /** Count of threads and caches. */ + private static final int THREADS_CNT = 20; + /** Count of nodes to start. */ + private static final int NODES_CNT = 3; + /** Count of transaction on cache. */ + private static final int TRANSACTION_CNT = 10; + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new StopNodeFailureHandler(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(name); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + cfg.setLocalHost("127.0.0.1"); + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(50 * 1024 * 1024) + .setPersistenceEnabled(true)) + ); + + CacheConfiguration[] cacheConfigurations = new CacheConfiguration[THREADS_CNT]; + for (int i = 0; i < THREADS_CNT; i++) { + cacheConfigurations[i] = new CacheConfiguration() + .setName(cacheName(i)) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setBackups(1) + .setAtomicityMode(TRANSACTIONAL) + .setCacheMode(CacheMode.PARTITIONED) + .setWriteSynchronizationMode(FULL_SYNC) + .setEvictionPolicy(new FifoEvictionPolicy(1000)) + .setOnheapCacheEnabled(true); + } + + cfg.setCacheConfiguration(cacheConfigurations); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * Test transfer amount. + */ + public void testTransferAmount() throws Exception { + //given: started some nodes with client. + startGrids(NODES_CNT); + + IgniteEx igniteClient = startGrid(getClientConfiguration(NODES_CNT)); + + igniteClient.cluster().active(true); + + Random random = new Random(); + + long[] initAmount = new long[THREADS_CNT]; + + //and: fill all accounts on all caches and calculate total amount for every cache. + for (int cachePrefixIdx = 0; cachePrefixIdx < THREADS_CNT; cachePrefixIdx++) { + IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx)); + + try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int accountId = 0; accountId < ACCOUNTS_CNT; accountId++) { + Long amount = (long)random.nextInt(1000); + + cache.put(accountId, amount); + + initAmount[cachePrefixIdx] += amount; + } + + tx.commit(); + } + } + + //when: start transfer amount from account to account in different threads. + CountDownLatch firstTransactionDone = new CountDownLatch(THREADS_CNT); + + ArrayList<Thread> transferThreads = new ArrayList<>(); + + for (int i = 0; i < THREADS_CNT; i++) { + transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i))); + + transferThreads.get(i).start(); + } + + firstTransactionDone.await(10, TimeUnit.SECONDS); + + //and: terminate disco-event-worker thread on one node. + WorkersControlMXBean bean = workersMXBean(1); + + bean.terminateWorker( + bean.getWorkerNames().stream() + .filter(name -> name.startsWith("disco-event-worker")) + .findFirst() + .orElse(null) + ); + + for (Thread thread : transferThreads) { + thread.join(); + } + + long[] resultAmount = new long[THREADS_CNT]; + + //then: calculate total amount for every thread. + for (int j = 0; j < THREADS_CNT; j++) { + String cacheName = cacheName(j); + + IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName); + + try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + + for (int i = 0; i < ACCOUNTS_CNT; i++) + resultAmount[j] += getNotNullValue(cache, i); + tx.commit(); + } + + long diffAmount = initAmount[j] - resultAmount[j]; + + //and: check that result amount equal to init amount. + assertTrue( + String.format("Total amount before and after transfer is not same: diff=%s, cache=%s", + diffAmount, cacheName), + diffAmount == 0 + ); + } + } + + /** + * Make test cache name by prefix. + */ + @NotNull private String cacheName(int cachePrefixIdx) { + return "cache" + cachePrefixIdx; + } + + /** + * Ignite configuration for client. + */ + @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception { + IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix)); + + clientConf.setClientMode(true); + + return clientConf; + } + + /** + * Extract not null value from cache. + */ + private long getNotNullValue(IgniteCache<Object, Object> cache, int i) { + Object value = cache.get(i); + + return value == null ? 0 : ((Long)value); + } + + /** + * Configure workers mx bean. + */ + private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception { + ObjectName mbeanName = U.makeMBeanName( + getTestIgniteInstanceName(igniteInt), + "Kernal", + WorkersControlMXBeanImpl.class.getSimpleName() + ); + + MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer(); + + if (!mbeanSrv.isRegistered(mbeanName)) + fail("MBean is not registered: " + mbeanName.getCanonicalName()); + + return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true); + } + + /** + * + */ + private static class TransferAmountTxThread extends Thread { + /** */ + private CountDownLatch firstTransactionLatch; + /** */ + private Ignite ignite; + /** */ + private String cacheName; + /** */ + private Random random = new Random(); + + /** + * @param ignite Ignite. + */ + private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final Ignite ignite, String cacheName) { + this.firstTransactionLatch = firstTransactionLatch; + this.ignite = ignite; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public void run() { + for (int i = 0; i < TRANSACTION_CNT; i++) { + try { + updateInTransaction(ignite.cache(cacheName)); + } + finally { + if (i == 0) + firstTransactionLatch.countDown(); + } + } + } + + /** + * @throws IgniteException if fails + */ + @SuppressWarnings("unchecked") + private void updateInTransaction(IgniteCache cache) throws IgniteException { + int accIdFrom = random.nextInt(ACCOUNTS_CNT); + int accIdTo = random.nextInt(ACCOUNTS_CNT); + + if (accIdFrom == accIdTo) + accIdTo = (int)getNextAccountId(accIdFrom); + + Long acctFrom; + Long acctTo; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + acctFrom = (Long)cache.get(accIdFrom); + acctTo = (Long)cache.get(accIdTo); + + long transactionAmount = (long)(random.nextDouble() * acctFrom); + + cache.put(accIdFrom, acctFrom - transactionAmount); + cache.put(accIdTo, acctTo + transactionAmount); + + tx.commit(); + } + } + + /** + * @param curr current + * @return random value + */ + private long getNextAccountId(long curr) { + long randomVal; + + do { + randomVal = random.nextInt(ACCOUNTS_CNT); + } + while (curr == randomVal); + + return randomVal; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index e71a569..21d56b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.GridSuppressedExceptionSelfTest; +import org.apache.ignite.failure.AccountTransferTransactionTest; import org.apache.ignite.failure.FailureHandlerTriggeredTest; import org.apache.ignite.failure.IoomFailureHandlerTest; import org.apache.ignite.failure.OomFailureHandlerTest; @@ -205,6 +206,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class); suite.addTestSuite(IoomFailureHandlerTest.class); suite.addTestSuite(OomFailureHandlerTest.class); + suite.addTestSuite(AccountTransferTransactionTest.class); return suite; }
