IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9da2dde3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9da2dde3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9da2dde3 Branch: refs/heads/ignite-6181-1 Commit: 9da2dde3e7818d07d39756f2b92255d15acc147e Parents: 61dfb65 Author: Aleksei Scherbakov <[email protected]> Authored: Wed Aug 30 18:12:10 2017 +0300 Committer: Aleksei Scherbakov <[email protected]> Committed: Wed Aug 30 18:12:10 2017 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 65 ++-- .../cache/transactions/IgniteTxManager.java | 42 +-- .../transactions/TxRollbackOnTimeoutTest.java | 303 +++++++++++++++++++ .../transactions/TxTimeoutHandlingTest.java | 147 --------- 4 files changed, 335 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9da2dde3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ac35ad9..ebae49f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -34,7 +34,6 @@ import javax.cache.CacheException; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -119,7 +118,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; * Replicated user transaction. */ @SuppressWarnings("unchecked") -public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable { +public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable { /** */ private static final long serialVersionUID = 0L; @@ -176,8 +175,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea private TransactionProxyImpl proxy; /** */ - @GridToStringExclude - private GridTimeoutObject timeoutHandler; + private long endTime; /** * Empty constructor required for {@link Externalizable}. @@ -224,7 +222,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea plc, concurrency, isolation, - timeout, + timeout == 0 ? 0 : Math.max(100, timeout), false, storeEnabled, false, @@ -234,6 +232,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); + if (this.timeout > 0) { + endTime = U.currentTimeMillis() + this.timeout; + + cctx.time().addTimeoutObject(this); + } + initResult(); } @@ -3152,11 +3156,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea // Prepare was called explicitly. return fut; - // Clears timeout handler before prepare, because prepare phase will clean up everything right up. - if (!clearTimeoutHandler()) { - // Timeout closure has been invoked, tx will be rolled back. - throw new IgniteException("Bad"); - } + if (endTime > 0) + cctx.time().removeTimeoutObject(this); mapExplicitLocks(); @@ -3180,15 +3181,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** - * Clears timeout handler. - */ - public boolean clearTimeoutHandler() { - timeoutHandler = null; - - return true; - } - - /** * @return Prepare future. */ private IgniteInternalFuture<?> prepareAsync() { @@ -3269,7 +3261,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea * @throws IgniteCheckedException If failed. */ public void rollback() throws IgniteCheckedException { - clearTimeoutHandler(); + if (endTime > 0) + cctx.time().removeTimeoutObject(this); rollbackNearTxLocalAsync().get(); } @@ -3786,20 +3779,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** - * @return Timeout handler. - */ - public GridTimeoutObject timeoutHandler() { - return timeoutHandler; - } - - /** - * @param timeoutHnd New timeout handler. - */ - public void timeoutHandler(GridTimeoutObject timeoutHnd) { - timeoutHandler = timeoutHnd; - } - - /** * @return Public API proxy. */ public TransactionProxy proxy() { @@ -4082,6 +4061,24 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return xid(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + log.error("Transaction is timed out and will be rolled back: [tx=" + this + ']'); + + if (setRollbackOnly()) + rollbackAsync(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "thread", IgniteUtils.threadName(threadId), http://git-wip-us.apache.org/repos/asf/ignite/blob/9da2dde3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 64624dd..a3bf205 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -460,12 +460,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { tx.topologyVersion(topVer); } - tx = onCreated(sysCacheCtx, tx); - - if (tx != null && tx.timeout() > 0) - cctx.time().addTimeoutObject(new TxEagerTimeoutObject(tx)); - - return tx; + return onCreated(sysCacheCtx, tx); } /** @@ -2405,41 +2400,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Timeout object for tx timeout handler. - */ - private final class TxEagerTimeoutObject extends GridTimeoutObjectAdapter { - /** */ - private final GridNearTxLocal tx; - - /** - * @param tx Tx. - */ - public TxEagerTimeoutObject(final GridNearTxLocal tx) { - super(tx.timeout()); - - this.tx = tx; - - tx.timeoutHandler(this); - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - GridTimeoutObject hnd = tx.timeoutHandler(); - - if (hnd == null) - return; - - if (!tx.clearTimeoutHandler()) // Transaction is prepared or rolled back concurrently. - return; - - log.error("Transaction is timed out and will be rolled back: [tx=" + tx + ']'); - - if (tx.setRollbackOnly()) - tx.rollbackAsync(); - } - } - - /** * Per-thread key for system transactions. */ private static class TxThreadKey { http://git-wip-us.apache.org/repos/asf/ignite/blob/9da2dde3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java new file mode 100644 index 0000000..e93cff1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.GridConcurrentSkipListSet; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Tests an ability to eagerly rollback timed out transactions. + */ +public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { + /** */ + private static final long TX_TIMEOUT = 3_000L; + + /** */ + private static final String CACHE_NAME = "test"; + + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private final CountDownLatch blocked = new CountDownLatch(1); + + /** */ + private CountDownLatch unblocked = new CountDownLatch(1); + + /** */ + private static int GRID_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode("client".equals(igniteInstanceName)); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + TransactionConfiguration txCfg = new TransactionConfiguration(); + txCfg.setDefaultTxTimeout(TX_TIMEOUT); + + cfg.setTransactionConfiguration(txCfg); + + CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setBackups(2); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout1() throws Exception { + testWaitingTxUnblockedOnTimeout0(grid(0), grid(0)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout2() throws Exception { + testWaitingTxUnblockedOnTimeout0(grid(0), grid(1)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout3() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnTimeout0(grid(0), client); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout4() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnTimeout0(client, grid(0)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout5() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnTimeout0(client, client); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout6() throws Exception { + testWaitingTxUnblockedOnThreadDeath0(grid(0), grid(0)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout7() throws Exception { + testWaitingTxUnblockedOnThreadDeath0(grid(0), grid(1)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout8() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnThreadDeath0(grid(0), client); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout9() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnThreadDeath0(client, grid(0)); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + */ + public void testWaitingTxUnblockedOnTimeout10() throws Exception { + Ignite client = startGrid("client"); + + testWaitingTxUnblockedOnThreadDeath0(client, client); + } + + /** + * Tests timeout object cleanup on tx commit. + */ + public void testTimeoutRemovalOnCommit() throws Exception { + testTimeoutRemoval(grid(0), true); + } + + /** + * Tests timeout object cleanup on tx rollback. + */ + public void testTimeoutRemovalOnRollback() throws Exception { + testTimeoutRemoval(grid(0), false); + } + + /** */ + private void testTimeoutRemoval(IgniteEx near, boolean commit) throws Exception { + GridTimeoutProcessor timeProc = near.context().cache().context().time(); + + try (Transaction tx = near.transactions().txStart()) { + near.cache(CACHE_NAME).put(1, 1); + + if (commit) + tx.commit(); + } + + GridConcurrentSkipListSet set = U.field(timeProc, "timeoutObjs"); + + for (Object obj : set) + assertFalse(obj.getClass().isAssignableFrom(GridNearTxLocal.class)); + } + + /** */ + private void testWaitingTxUnblockedOnTimeout0(final Ignite near, final Ignite other) throws Exception { + final int recordsCnt = 100; + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = near.transactions().txStart()) { + for (int i = 0; i < recordsCnt; i++) + near.cache(CACHE_NAME).put(i, i); + + blocked.countDown(); + + // Will be unblocked after tx timeout occurs. + U.awaitQuiet(unblocked); + + try { + tx.commit(); + + fail(); + } + catch (IgniteException e) { + log.info("Expecting error: " + e.getMessage()); + } + } + } + }, 1, "First"); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() { + @Override public void run() { + U.awaitQuiet(blocked); + + try (Transaction tx = other.transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ, 0, 1)) { + for (int i = 0; i < recordsCnt; i++) + other.cache(CACHE_NAME).put(i, i); + + // Will wait until timeout on first tx will unblock put. + tx.commit(); + } + } + }, 2, "Second"); + + fut2.get(5, TimeUnit.SECONDS); + + unblocked.countDown(); + + fut1.get(5, TimeUnit.SECONDS); + } + + private void testWaitingTxUnblockedOnThreadDeath0(final Ignite near, final Ignite other) throws Exception { + final int recordsCnt = 100; + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { + @Override public void run() { + near.transactions().txStart(); + for (int i = 0; i < recordsCnt; i++) + near.cache(CACHE_NAME).put(i, i); + + blocked.countDown(); + + throw new IgniteException("Failure"); + } + }, 1, "First"); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() { + @Override public void run() { + U.awaitQuiet(blocked); + + try (Transaction tx = other.transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ, 0, 1)) { + for (int i = 0; i < recordsCnt; i++) + other.cache(CACHE_NAME).put(i, i); + + // Will wait until timeout on first tx will unblock put. + tx.commit(); + } + } + }, 2, "Second"); + + try { + fut1.get(); + + fail(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + fut2.get(5, TimeUnit.SECONDS); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9da2dde3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxTimeoutHandlingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxTimeoutHandlingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxTimeoutHandlingTest.java deleted file mode 100644 index 8b17127..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxTimeoutHandlingTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.transactions; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.TransactionConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -/** - * Tests ability to rollback not properly closed transaction. - */ -public class TxTimeoutHandlingTest extends GridCommonAbstractTest { - /** */ - private static final long TX_TIMEOUT = 3_000L; - - /** */ - private static final String CACHE_NAME = "test"; - - /** IP finder. */ - private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private final CountDownLatch l = new CountDownLatch(1); - - /** */ - private final Object mux = new Object(); - - /** */ - private volatile boolean released; - - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); - - TransactionConfiguration txCfg = new TransactionConfiguration(); - txCfg.setDefaultTxTimeout(TX_TIMEOUT); - - cfg.setTransactionConfiguration(txCfg); - - CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME); - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** */ - public void testTxTimeoutHandling() throws Exception { - try { - final Ignite ignite = startGrid(0); - - IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { - @Override public void run() { - // Start tx with default settings. - try (Transaction tx = ignite.transactions().txStart()) { - ignite.cache(CACHE_NAME).put(1, 1); - - l.countDown(); - - // Wait longer than default timeout. - synchronized (mux) { - while (!released) { - try { - mux.wait(); - } - catch (InterruptedException e) { - throw new IgniteException(e); - } - } - } - - try { - tx.commit(); - - fail(); - } - catch (IgniteException e) { - // Expect exception - tx is rolled back. - } - } - } - }, 1, "Locker"); - - IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() { - @Override public void run() { - U.awaitQuiet(l); - - // Try to acquire lock. - // Acquisition will be successful then first transaction will be rolled back after timeout. - try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, - TransactionIsolation.REPEATABLE_READ, 0, 1)) { - ignite.cache(CACHE_NAME).put(1, 1); - - tx.commit(); - } - } - }, 1, "Waiter"); - - Thread.sleep(TX_TIMEOUT + 1_000); - - fut2.get(); - - startGrid(1); - - released = true; - - synchronized (mux) { - mux.notify(); - } - } - finally { - stopAllGrids(); - } - } -} \ No newline at end of file
