ignite-6181
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ca6aff1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ca6aff1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ca6aff1 Branch: refs/heads/ignite-6181-2 Commit: 5ca6aff13a59e0050cd5d94b7460aad861266065 Parents: 60c2bb5 Author: sboikov <sboi...@gridgain.com> Authored: Fri Sep 22 11:03:47 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Sep 22 11:17:11 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/CacheMetrics.java | 2 - .../processors/cache/CacheMetricsImpl.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 1 + .../near/GridNearFastFinishFuture.java | 82 ------------ .../distributed/near/GridNearLockFuture.java | 1 + .../near/GridNearTxFastFinishFuture.java | 82 ++++++++++++ .../near/GridNearTxFinishFuture.java | 1 + .../cache/distributed/near/GridNearTxLocal.java | 127 ++++++++++--------- .../cache/transactions/IgniteTxManager.java | 1 + .../processors/cache/CacheTxFastFinishTest.java | 9 +- 10 files changed, 158 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 470645b..20ea692 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -269,7 +269,6 @@ public interface CacheMetrics { * * @return Thread map size. */ - @Deprecated public int getTxThreadMapSize(); /** @@ -319,7 +318,6 @@ public interface CacheMetrics { * * @return DHT thread map size. */ - @Deprecated public int getTxDhtThreadMapSize(); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index d608435..413b60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -298,7 +298,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxThreadMapSize() { - return 0; + return cctx.tm().threadMapSize(); } /** {@inheritDoc} */ @@ -328,7 +328,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxDhtThreadMapSize() { - return 0; + return cctx.tm().threadMapSize(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 6c1f9dd..e4f4601 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -153,6 +153,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** */ + @SuppressWarnings("UnusedDeclaration") private volatile int done; /** Trackable flag (here may be non-volatile). */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java deleted file mode 100644 index abad105..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.util.future.GridFutureAdapter; - -import static org.apache.ignite.transactions.TransactionState.COMMITTED; -import static org.apache.ignite.transactions.TransactionState.COMMITTING; -import static org.apache.ignite.transactions.TransactionState.PREPARED; -import static org.apache.ignite.transactions.TransactionState.PREPARING; -import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; -import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; - -/** - * - */ -public class GridNearFastFinishFuture extends GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture { - /** */ - private final GridNearTxLocal tx; - - /** */ - private final boolean commit; - - /** - * @param tx Transaction. - * @param commit Commit flag. - */ - GridNearFastFinishFuture(GridNearTxLocal tx, boolean commit) { - this.tx = tx; - this.commit = commit; - } - - /** {@inheritDoc} */ - @Override public boolean commit() { - return commit; - } - - /** - * - */ - public void finish() { - try { - if (commit) { - tx.state(PREPARING); - tx.state(PREPARED); - tx.state(COMMITTING); - - tx.context().tm().fastFinishTx(tx, true); - - tx.state(COMMITTED); - } - else { - tx.state(PREPARING); - tx.state(PREPARED); - tx.state(ROLLING_BACK); - - tx.context().tm().fastFinishTx(tx, false); - - tx.state(ROLLED_BACK); - } - } - finally { - onDone(tx); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 32f9d5b..3d9989d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -149,6 +149,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** */ + @SuppressWarnings("UnusedDeclaration") private volatile int done; /** Keys locked so far. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java new file mode 100644 index 0000000..7222697 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.future.GridFutureAdapter; + +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.PREPARING; +import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; +import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; + +/** + * + */ +public class GridNearTxFastFinishFuture extends GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture { + /** */ + private final GridNearTxLocal tx; + + /** */ + private final boolean commit; + + /** + * @param tx Transaction. + * @param commit Commit flag. + */ + GridNearTxFastFinishFuture(GridNearTxLocal tx, boolean commit) { + this.tx = tx; + this.commit = commit; + } + + /** {@inheritDoc} */ + @Override public boolean commit() { + return commit; + } + + /** + * + */ + public void finish() { + try { + if (commit) { + tx.state(PREPARING); + tx.state(PREPARED); + tx.state(COMMITTING); + + tx.context().tm().fastFinishTx(tx, true); + + tx.state(COMMITTED); + } + else { + tx.state(PREPARING); + tx.state(PREPARED); + tx.state(ROLLING_BACK); + + tx.context().tm().fastFinishTx(tx, false); + + tx.state(ROLLED_BACK); + } + } + finally { + onDone(tx); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index d13c77f..b6a8855 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -398,6 +398,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit * Initializes future. * * @param commit Commit flag. + * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map. */ @SuppressWarnings("ForLoopReplaceableByForEach") public void finish(boolean commit, boolean clearThreadMap) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 5e5de80..8b043d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3191,64 +3191,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param fut Already started finish future. - * @param commit Commit flag. - * @return Finish future. - */ - private IgniteInternalFuture<IgniteInternalTx> chainFinishFuture(final NearTxFinishFuture fut, final boolean commit) { - assert fut != null; - - if (fut.commit() != commit) { - final GridNearTxLocal tx = this; - - if (!commit) { - final GridNearTxFinishFuture rollbackFut = new GridNearTxFinishFuture<>(cctx, this, false); - - fut.listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut0) { - if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { - if (tx.state() == COMMITTED) { - if (log.isDebugEnabled()) - log.debug("Failed to rollback, transaction is already committed: " + tx); - - rollbackFut.forceFinish(); - - assert rollbackFut.isDone() : rollbackFut; - } - else { - if (!cctx.mvcc().addFuture(rollbackFut, rollbackFut.futureId())) - return; - - rollbackFut.finish(false, true); - } - } - } - }); - - return rollbackFut; - } - else { - final GridFutureAdapter<IgniteInternalTx> fut0 = new GridFutureAdapter<>(); - - fut.listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { - if (timedOut()) - fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back on timeout: " + tx)); - else - fut0.onDone(new IgniteCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back: " + tx)); - } - }); - - return fut0; - } - } - - return fut; - } - - /** * @return Finish future. */ public IgniteInternalFuture<IgniteInternalTx> commitNearTxLocalAsync() { @@ -3261,9 +3203,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return chainFinishFuture(fut, true); if (fastFinish()) { - GridNearFastFinishFuture fut0; + GridNearTxFastFinishFuture fut0; - if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearFastFinishFuture(this, true))) + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFastFinishFuture(this, true))) return chainFinishFuture(finishFut, true); fut0.finish(); @@ -3343,9 +3285,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return chainFinishFuture(finishFut, false); if (fastFinish()) { - GridNearFastFinishFuture fut0; + GridNearTxFastFinishFuture fut0; - if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearFastFinishFuture(this, false))) + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFastFinishFuture(this, false))) return chainFinishFuture(finishFut, false); fut0.finish(); @@ -3401,6 +3343,64 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * @param fut Already started finish future. + * @param commit Commit flag. + * @return Finish future. + */ + private IgniteInternalFuture<IgniteInternalTx> chainFinishFuture(final NearTxFinishFuture fut, final boolean commit) { + assert fut != null; + + if (fut.commit() != commit) { + final GridNearTxLocal tx = this; + + if (!commit) { + final GridNearTxFinishFuture rollbackFut = new GridNearTxFinishFuture<>(cctx, this, false); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut0) { + if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { + if (tx.state() == COMMITTED) { + if (log.isDebugEnabled()) + log.debug("Failed to rollback, transaction is already committed: " + tx); + + rollbackFut.forceFinish(); + + assert rollbackFut.isDone() : rollbackFut; + } + else { + if (!cctx.mvcc().addFuture(rollbackFut, rollbackFut.futureId())) + return; + + rollbackFut.finish(false, true); + } + } + } + }); + + return rollbackFut; + } + else { + final GridFutureAdapter<IgniteInternalTx> fut0 = new GridFutureAdapter<>(); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + if (timedOut()) + fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back on timeout: " + tx)); + else + fut0.onDone(new IgniteCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back: " + tx)); + } + }); + + return fut0; + } + } + + return fut; + } + + /** * @return {@code True} if 'fast finish' path can be used for transaction completion. */ private boolean fastFinish() { @@ -3782,6 +3782,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } } finally { + // It is possible tx was rolled back asynchronously on timeout and thread map is not cleared yet. boolean cleanup = state == ROLLED_BACK && timedOut(); if (cleanup) @@ -4128,6 +4129,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { + // Note: if rollback asynchonously on timeout should not clear thread map + // since thread started tx still should be able to see this tx. rollbackNearTxLocalAsync(true); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 208bd61..77634bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1269,6 +1269,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Rolls back a transaction. * * @param tx Transaction to rollback. + * @param clearThreadMap {@code True} if need remove tx from thread map. */ public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap) { assert tx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/5ca6aff1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java index 9f4910a..79316bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteTransactions; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFastFinishFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -210,8 +211,7 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx(); assertNull(fieldValue(tx0, "prepFut")); - assertNull(fieldValue(tx0, "commitFut")); - assertNull(fieldValue(tx0, "rollbackFut")); + assertTrue(fieldValue(tx0, "finishFut") instanceof GridNearTxFastFinishFuture); } /** @@ -225,12 +225,13 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { tx.commit(); assertNotNull(fieldValue(tx0, "prepFut")); - assertNotNull(fieldValue(tx0, "commitFut")); + assertNotNull(fieldValue(tx0, "finishFut")); } else { tx.rollback(); - assertNotNull(fieldValue(tx0, "rollbackFut")); + assertNull(fieldValue(tx0, "prepFut")); + assertNotNull(fieldValue(tx0, "finishFut")); } }