Repository: ignite Updated Branches: refs/heads/master d9f2b2040 -> 5e9710abc
IGNITE-9411: MVCC: better handling of TX timeouts. This closes #4745. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e9710ab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e9710ab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e9710ab Branch: refs/heads/master Commit: 5e9710abc4caf031923e2371dbfb27a6e9b4b2ec Parents: d9f2b20 Author: ipavlukhin <vololo...@gmail.com> Authored: Mon Sep 24 09:02:17 2018 +0300 Committer: devozerov <ppoze...@gmail.com> Committed: Mon Sep 24 09:02:17 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/mvcc/MvccUtils.java | 3 +- .../query/h2/DmlStatementsProcessor.java | 12 +- .../processors/query/h2/IgniteH2Indexing.java | 46 ++- .../query/h2/twostep/GridMapQueryExecutor.java | 6 +- .../h2/twostep/GridReduceQueryExecutor.java | 4 +- ...cheMvccSelectForUpdateQueryAbstractTest.java | 8 +- .../cache/mvcc/CacheMvccSqlLockTimeoutTest.java | 353 +++++++++++++++++++ .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 31 +- ...MvccSqlTxQueriesWithReducerAbstractTest.java | 38 +- .../testsuites/IgniteCacheMvccSqlTestSuite.java | 2 + 10 files changed, 465 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index 0422459..c3f9a6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -685,8 +685,7 @@ public class MvccUtils { */ private static GridNearTxLocal txStart(GridKernalContext ctx, @Nullable GridCacheContext cctx, long timeout) { if (timeout == 0) { - TransactionConfiguration tcfg = cctx != null ? - CU.transactionConfiguration(cctx, ctx.config()) : null; + TransactionConfiguration tcfg = CU.transactionConfiguration(cctx, ctx.config()); if (tcfg != null) timeout = tcfg.getDefaultTxTimeout(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 6ce43dd..31715f1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -508,15 +508,9 @@ public class DmlStatementsProcessor { requestSnapshot(cctx, checkActive(tx)); try (GridNearTxLocal toCommit = commit ? tx : null) { - long timeout; - - if (implicit) - timeout = tx.remainingTime(); - else { - long tm1 = tx.remainingTime(), tm2 = fieldsQry.getTimeout(); - - timeout = tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, tm2); - } + long timeout = implicit + ? tx.remainingTime() + : IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx); if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 79c431f..96d864d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -1062,7 +1063,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param filter Cache name and key filter. * @param enforceJoinOrder Enforce join order of tables in the query. * @param startTx Start transaction flag. - * @param timeout Query timeout in milliseconds. + * @param qryTimeout Query timeout in milliseconds. * @param cancel Query cancel. * @param mvccTracker Query tracker. * @return Query result. @@ -1071,7 +1072,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @SuppressWarnings("unchecked") GridQueryFieldsResult queryLocalSqlFields(final String schemaName, String qry, @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder, - boolean startTx, int timeout, final GridQueryCancel cancel, + boolean startTx, int qryTimeout, final GridQueryCancel cancel, MvccQueryTracker mvccTracker) throws IgniteCheckedException { GridNearTxLocal tx = null; @@ -1099,7 +1100,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fldsQry.setArgs(params.toArray()); fldsQry.setEnforceJoinOrder(enforceJoinOrder); - fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); + fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS); return dmlProc.updateSqlFieldsLocal(schemaName, conn, p, fldsQry, filter, cancel); } @@ -1119,6 +1120,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridNearTxSelectForUpdateFuture sfuFut = null; + int opTimeout = qryTimeout; + if (mvccEnabled) { if (mvccTracker == null) mvccTracker = mvccTracker(stmt, startTx); @@ -1126,11 +1129,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (mvccTracker != null) { ctx.mvccSnapshot(mvccTracker.snapshot()); - if ((tx = checkActive(tx(this.ctx))) != null) { - int tm1 = (int)tx.remainingTime(), tm2 = timeout; + tx = checkActive(tx(this.ctx)); - timeout = tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, tm2); - } + opTimeout = operationTimeout(opTimeout, tx); } if (forUpdate) { @@ -1155,7 +1156,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException("Failed to lock topology for SELECT FOR UPDATE query.", e); } - sfuFut = new GridNearTxSelectForUpdateFuture(cctx, tx, timeout); + sfuFut = new GridNearTxSelectForUpdateFuture(cctx, tx, opTimeout); sfuFut.initLocal(); } @@ -1182,7 +1183,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridNearTxSelectForUpdateFuture sfuFut0 = sfuFut; PreparedStatement stmt0 = stmt; String qry0 = qry; - int timeout0 = timeout; + int timeout0 = opTimeout; return new GridQueryFieldsResultAdapter(meta, null) { @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException { @@ -1268,6 +1269,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } + /** + * @param qryTimeout Query timeout in milliseconds. + * @param tx Transaction. + * @return Timeout for operation in milliseconds based on query and tx timeouts. + */ + public static int operationTimeout(int qryTimeout, IgniteTxAdapter tx) { + if (tx != null) { + int tm1 = (int)tx.remainingTime(), tm2 = qryTimeout; + + return tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, tm2); + } + + return qryTimeout; + } + /** {@inheritDoc} */ @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException { @@ -1744,7 +1760,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param keepCacheObj Flag to keep cache object. * @param enforceJoinOrder Enforce join order of tables. * @param startTx Start transaction flag. - * @param timeoutMillis Query timeout. + * @param qryTimeout Query timeout. * @param cancel Cancel object. * @param params Query parameters. * @param parts Partitions. @@ -1758,7 +1774,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { final boolean keepCacheObj, final boolean enforceJoinOrder, boolean startTx, - final int timeoutMillis, + final int qryTimeout, final GridQueryCancel cancel, final Object[] params, final int[] parts, @@ -1770,13 +1786,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { final MvccQueryTracker tracker = mvccTracker == null && qry.mvccEnabled() ? MvccUtils.mvccTracker(ctx.cache().context().cacheContext(qry.cacheIds().get(0)), startTx) : mvccTracker; + GridNearTxLocal tx = tx(ctx); + if (qry.forUpdate()) - qry.forUpdate(checkActive(tx(ctx)) != null); + qry.forUpdate(checkActive(tx) != null); + + int opTimeout = operationTimeout(qryTimeout, tx); return new Iterable<List<?>>() { @SuppressWarnings("NullableProblems") @Override public Iterator<List<?>> iterator() { - return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, + return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, opTimeout, cancel, params, parts, lazy, tracker); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 9166604..ab60746 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -925,7 +925,9 @@ public class GridMapQueryExecutor { h2.bindParameters(stmt, params0); - rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, timeout, qr.queryCancel(qryIdx)); + int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx); + + rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx)); if (inTx) { ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future( @@ -937,7 +939,7 @@ public class GridMapQueryExecutor { txDetails.miniId(), parts, tx, - timeout, + opTimeout, mainCctx, rs ); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 96c88ff..6474d55 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -113,7 +113,9 @@ import org.jetbrains.annotations.Nullable; import static java.util.Collections.singletonList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.*; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java index 739aaf8..5c81974 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java @@ -101,22 +101,20 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc /** - * + * @throws Exception If failed. */ public void testSelectForUpdateLocal() throws Exception { doTestSelectForUpdateLocal("Person", false); } /** - * * @throws Exception If failed. */ - public void testSelectForUpdateOutsideTx() throws Exception { + public void testSelectForUpdateOutsideTxDistributed() throws Exception { doTestSelectForUpdateDistributed("Person", true); } /** - * * @throws Exception If failed. */ public void testSelectForUpdateOutsideTxLocal() throws Exception { @@ -261,6 +259,8 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc checkLocks("Person", keys, true); tx.rollback(); + + checkLocks("Person", keys, false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java new file mode 100644 index 0000000..eae79a5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** */ +public class CacheMvccSqlLockTimeoutTest extends CacheMvccAbstractTest { + /** */ + private static final int TIMEOUT_MILLIS = 200; + + /** */ + private UnaryOperator<IgniteConfiguration> cfgCustomizer = UnaryOperator.identity(); + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + throw new RuntimeException("Is not used in current test"); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return cfgCustomizer.apply(super.getConfiguration(gridName)); + } + + /** + * @throws Exception if failed. + */ + public void testLockTimeoutsForPartitionedCache() throws Exception { + checkLockTimeouts(partitionedCacheConfig()); + } + + /** + * @throws Exception if failed. + */ + public void testLockTimeoutsForReplicatedCache() throws Exception { + checkLockTimeouts(replicatedCacheConfig()); + } + + /** + * @throws Exception if failed. + */ + public void testLockTimeoutsAfterDefaultTxTimeoutForPartitionedCache() throws Exception { + checkLockTimeoutsAfterDefaultTxTimeout(partitionedCacheConfig()); + } + + /** + * @throws Exception if failed. + */ + public void testLockTimeoutsAfterDefaultTxTimeoutForReplicatedCache() throws Exception { + checkLockTimeoutsAfterDefaultTxTimeout(replicatedCacheConfig()); + } + + /** + * @throws Exception if failed. + */ + public void testConcurrentForPartitionedCache() throws Exception { + checkTimeoutsConcurrent(partitionedCacheConfig()); + } + + /** + * @throws Exception if failed. + */ + public void testConcurrentForReplicatedCache() throws Exception { + checkTimeoutsConcurrent(replicatedCacheConfig()); + } + + /** */ + private CacheConfiguration<?, ?> partitionedCacheConfig() { + return baseCacheConfig() + .setCacheMode(PARTITIONED) + .setBackups(1); + } + + /** */ + private CacheConfiguration<?, ?> replicatedCacheConfig() { + return baseCacheConfig().setCacheMode(REPLICATED); + } + + /** */ + private CacheConfiguration<?, ?> baseCacheConfig() { + return new CacheConfiguration<>("test") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) + .setSqlSchema("PUBLIC") + .setIndexedTypes(Integer.class, Integer.class); + } + + /** */ + private void checkLockTimeouts(CacheConfiguration<?, ?> ccfg) throws Exception { + startGridsMultiThreaded(2); + + IgniteEx ignite = grid(0); + + ignite.createCache(ccfg); + + AtomicInteger keyCntr = new AtomicInteger(); + + int nearKey = keyForNode(ignite.affinity("test"), keyCntr, ignite.localNode()); + int otherKey = keyForNode(ignite.affinity("test"), keyCntr, grid(1).localNode()); + + TimeoutChecker timeoutChecker = new TimeoutChecker(ignite, "test"); + + timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.EXPLICIT, nearKey); + + timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.EXPLICIT, otherKey); + + timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.IMPLICIT, nearKey); + + timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.IMPLICIT, otherKey); + + // explicit tx timeout has no sense for implicit transaction + timeoutChecker.checkScenario(TimeoutMode.TX, TxStartMode.EXPLICIT, nearKey); + + timeoutChecker.checkScenario(TimeoutMode.TX, TxStartMode.EXPLICIT, otherKey); + } + + /** */ + private void checkLockTimeoutsAfterDefaultTxTimeout(CacheConfiguration<?, ?> ccfg) throws Exception { + cfgCustomizer = cfg -> + cfg.setTransactionConfiguration(new TransactionConfiguration().setDefaultTxTimeout(TIMEOUT_MILLIS)); + + startGridsMultiThreaded(2); + + IgniteEx ignite = grid(0); + + ignite.createCache(ccfg); + + AtomicInteger keyCntr = new AtomicInteger(); + + int nearKey = keyForNode(ignite.affinity("test"), keyCntr, ignite.localNode()); + int otherKey = keyForNode(ignite.affinity("test"), keyCntr, grid(1).localNode()); + + TimeoutChecker timeoutChecker = new TimeoutChecker(ignite, "test"); + + timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, TxStartMode.EXPLICIT, nearKey); + + timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, TxStartMode.EXPLICIT, otherKey); + + timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, TxStartMode.IMPLICIT, nearKey); + + timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, TxStartMode.IMPLICIT, otherKey); + } + + /** */ + private static class TimeoutChecker { + /** */ + final IgniteEx ignite; + /** */ + final String cacheName; + + /** */ + TimeoutChecker(IgniteEx ignite, String cacheName) { + this.ignite = ignite; + this.cacheName = cacheName; + } + + /** */ + void checkScenario(TimeoutMode timeoutMode, TxStartMode txStartMode, int key) throws Exception { + // 999 is used as bound to enforce query execution with obtaining cursor before enlist + assert key <= 999; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 1)) { + ignite.cache(cacheName).query(new SqlFieldsQuery("merge into Integer(_key, _val) values(?, 1)") + .setArgs(key)); + + tx.commit(); + } + + ensureTimeIsOut("insert into Integer(_key, _val) values(?, 42)", key, timeoutMode, txStartMode); + ensureTimeIsOut("merge into Integer(_key, _val) values(?, 42)", key, timeoutMode, txStartMode); + ensureTimeIsOut("update Integer set _val = 42 where _key = ?", key, timeoutMode, txStartMode); + ensureTimeIsOut("update Integer set _val = 42 where _key = ? or _key > 999", key, timeoutMode, txStartMode); + ensureTimeIsOut("delete from Integer where _key = ?", key, timeoutMode, txStartMode); + ensureTimeIsOut("delete from Integer where _key = ? or _key > 999", key, timeoutMode, txStartMode); + + // SELECT ... FOR UPDATE locking entries has no meaning for implicit transaction + if (txStartMode != TxStartMode.IMPLICIT) { + ensureTimeIsOut("select * from Integer where _key = ? for update", key, timeoutMode, txStartMode); + ensureTimeIsOut( + "select * from Integer where _key = ? or _key > 999 for update", key, timeoutMode, txStartMode); + } + } + + /** */ + void ensureTimeIsOut(String sql, int key, TimeoutMode timeoutMode, TxStartMode txStartMode) throws Exception { + assert txStartMode == TxStartMode.EXPLICIT || timeoutMode != TimeoutMode.TX; + + IgniteCache<?, ?> cache = ignite.cache(cacheName); + + int oldVal = (Integer)cache + .query(new SqlFieldsQuery("select _val from Integer where _key = ?").setArgs(key)) + .getAll().get(0).get(0); + + try (Transaction tx1 = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 6_000, 1)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 42 where _key = ?").setArgs(key)); + + try { + CompletableFuture.runAsync(() -> { + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setArgs(key); + + try (Transaction tx2 = txStartMode == TxStartMode.EXPLICIT ? startTx(timeoutMode): null) { + if (timeoutMode == TimeoutMode.STMT) + qry.setTimeout(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + cache.query(qry).getAll(); + + if (tx2 != null) + tx2.commit(); + } + finally { + ignite.context().cache().context().tm().resetContext(); + } + }).get(); + + fail("Timeout exception should be thrown"); + } + catch (ExecutionException e) { + assertTrue(msgContains(e, "Failed to acquire lock within provided timeout for transaction") + || msgContains(e, "Failed to finish transaction because it has been rolled back")); + } + + // assert that outer tx has not timed out + cache.query(new SqlFieldsQuery("update Integer set _val = 42 where _key = ?").setArgs(key)); + + tx1.rollback(); + } + + int newVal = (Integer)cache + .query(new SqlFieldsQuery("select _val from Integer where _key = ?").setArgs(key)) + .getAll().get(0).get(0); + + assertEquals(oldVal, newVal); + } + + /** */ + private Transaction startTx(TimeoutMode timeoutMode) { + return timeoutMode == TimeoutMode.TX + ? ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TIMEOUT_MILLIS, 1) + : ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + } + } + + /** */ + private static boolean msgContains(Throwable e, String str) { + return e.getMessage() != null && e.getMessage().contains(str); + } + + /** */ + private enum TimeoutMode { + /** */ + TX, + /** */ + TX_DEFAULT, + /** */ + STMT + } + + /** */ + private enum TxStartMode { + /** */ + EXPLICIT, + /** */ + IMPLICIT + } + + /** */ + private void checkTimeoutsConcurrent(CacheConfiguration<?, ?> ccfg) throws Exception { + startGridsMultiThreaded(2); + + IgniteEx ignite = grid(0); + + IgniteCache<?, ?> cache = ignite.createCache(ccfg); + + AtomicInteger keyCntr = new AtomicInteger(); + + List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 5; i++) + keys.add(keyForNode(grid(0).affinity("test"), keyCntr, ignite.localNode())); + + for (int i = 0; i < 5; i++) + keys.add(keyForNode(grid(1).affinity("test"), keyCntr, ignite.localNode())); + + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, keys)), + CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, keys)), + CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, keys)) + ).join(); + } + + /** */ + private void mergeInRandomOrder(IgniteEx ignite, IgniteCache<?, ?> cache, List<Integer> keys) { + List<Integer> keys0 = new ArrayList<>(keys); + + for (int i = 0; i < 100; i++) { + Collections.shuffle(keys0); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + SqlFieldsQuery qry = new SqlFieldsQuery("merge into Integer(_key, _val) values(?, ?)") + .setTimeout(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + int op = 0; + + for (Integer key : keys0) + cache.query(qry.setArgs(key, op++)); + + tx.commit(); + } + catch (Exception e) { + assertTrue(msgContains(e, "Failed to acquire lock within provided timeout for transaction") + || msgContains(e, "Mvcc version mismatch")); + } + finally { + ignite.context().cache().context().tm().resetContext(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java index 2aad2d4..b881f02 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java @@ -649,7 +649,27 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac /** * @throws Exception If failed. */ - public void testQueryDeadlock() throws Exception { + public void testQueryDeadlockWithTxTimeout() throws Exception { + checkQueryDeadlock(TimeoutMode.TX); + } + + /** + * @throws Exception If failed. + */ + public void testQueryDeadlockWithStmtTimeout() throws Exception { + checkQueryDeadlock(TimeoutMode.STMT); + } + + /** */ + private enum TimeoutMode { + /** */ + TX, + /** */ + STMT + } + + /** */ + private void checkQueryDeadlock(TimeoutMode timeoutMode) throws Exception { ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) .setIndexedTypes(Integer.class, Integer.class); @@ -671,7 +691,8 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac try { try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - tx.timeout(TX_TIMEOUT); + if (timeoutMode == TimeoutMode.TX) + tx.timeout(TX_TIMEOUT); IgniteCache<Object, Object> cache0 = node.cache(DEFAULT_CACHE_NAME); @@ -680,6 +701,9 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac SqlFieldsQuery qry = new SqlFieldsQuery((id % 2) == 0 ? qry1 : qry2); + if (timeoutMode == TimeoutMode.STMT) + qry.setTimeout(TX_TIMEOUT, TimeUnit.MILLISECONDS); + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { cur.getAll(); } @@ -688,6 +712,9 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac qry = new SqlFieldsQuery((id % 2) == 0 ? qry2 : qry1); + if (timeoutMode == TimeoutMode.STMT) + qry.setTimeout(TX_TIMEOUT, TimeUnit.MILLISECONDS); + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { cur.getAll(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java index 69cf108..a7cf292 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -505,14 +506,33 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache /** * @throws Exception If failed. */ - public void testQueryReducerDeadlockInsert() throws Exception { + public void testQueryReducerDeadlockInsertWithTxTimeout() throws Exception { + checkQueryReducerDeadlockInsert(TimeoutMode.TX); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerDeadlockInsertWithStmtTimeout() throws Exception { + checkQueryReducerDeadlockInsert(TimeoutMode.STMT); + } + + /** */ + private enum TimeoutMode { + /** */ + TX, + /** */ + STMT + } + + /** */ + public void checkQueryReducerDeadlockInsert(TimeoutMode timeoutMode) throws Exception { ccfgs = new CacheConfiguration[] { cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) .setName("int") .setIndexedTypes(Integer.class, Integer.class), cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) - .setIndexedTypes(Integer.class, - CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), }; startGridsMultiThreaded(2); @@ -544,7 +564,8 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache try { try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - tx.timeout(TIMEOUT); + if (timeoutMode == TimeoutMode.TX) + tx.timeout(TIMEOUT); String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + "SELECT DISTINCT _key, _val FROM \"int\".Integer ORDER BY _key"; @@ -554,6 +575,9 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache SqlFieldsQuery qry = new SqlFieldsQuery((id % 2) == 0 ? sqlAsc : sqlDesc); + if (timeoutMode == TimeoutMode.STMT) + qry.setTimeout(TIMEOUT, TimeUnit.MILLISECONDS); + IgniteCache<Object, Object> cache0 = node.cache(DEFAULT_CACHE_NAME); cache0.query(qry).getAll(); @@ -562,6 +586,9 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache qry = new SqlFieldsQuery((id % 2) == 0 ? sqlDesc : sqlAsc); + if (timeoutMode == TimeoutMode.STMT) + qry.setTimeout(TIMEOUT, TimeUnit.MILLISECONDS); + cache0.query(qry).getAll(); tx.commit(); @@ -577,8 +604,9 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache assertNotNull(ex0); - if (!X.hasCause(ex0, IgniteTxTimeoutCheckedException.class)) + assertThrowsWithCause(() -> { throw ex0; + }, IgniteTxTimeoutCheckedException.class); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index ec60596..888b1ba 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithCo import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfigurationValidationTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest; @@ -57,6 +58,7 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { suite.addTestSuite(CacheMvccDmlSimpleTest.class); suite.addTestSuite(SqlTransactionsCommandsWithMvccEnabledSelfTest.class); suite.addTestSuite(CacheMvccSizeTest.class); + suite.addTestSuite(CacheMvccSqlLockTimeoutTest.class); suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class);