Repository: ignite Updated Branches: refs/heads/ignite-2.7 37327917e -> 42ab0ebe3
IGNITE-9944: MVCC: Fixed "first request" handling for DHT transactions. This closes #5040. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42ab0ebe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42ab0ebe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42ab0ebe Branch: refs/heads/ignite-2.7 Commit: 42ab0ebe395e040e478e159ff982a6ec8c7c4894 Parents: 3732791 Author: ipavlukhin <[email protected]> Authored: Mon Oct 22 22:15:20 2018 +0300 Committer: devozerov <[email protected]> Committed: Mon Oct 22 22:16:52 2018 +0300 ---------------------------------------------------------------------- .../dht/GridDhtTxAbstractEnlistFuture.java | 8 +- .../CacheMvccRemoteTxOnNearNodeStartTest.java | 90 ++++++++++++++++++++ .../testsuites/IgniteCacheMvccTestSuite.java | 3 + 3 files changed, 100 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/42ab0ebe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index e2c8237..68669b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -193,6 +193,9 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd /** Moving partitions. */ private Map<Integer, Boolean> movingParts; + /** Map for tracking nodes to which first request was already sent in order to send smaller subsequent requests. */ + private final Set<ClusterNode> firstReqSent = new HashSet<>(); + /** * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. @@ -823,9 +826,12 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd GridDhtTxQueryEnlistRequest req; - if (newRemoteTx(node)) { + if (newRemoteTx(node)) addNewRemoteTxNode(node); + if (!firstReqSent.contains(node)) { + firstReqSent.add(node); + // If this is a first request to this node, send full info. req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(), futId, http://git-wip-us.apache.org/repos/asf/ignite/blob/42ab0ebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java new file mode 100644 index 0000000..ee23e38 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** */ +public class CacheMvccRemoteTxOnNearNodeStartTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** + * Ensures that remote transaction on near node is started + * when first request is sent to OWNING partition and second to MOVING partition. + * @throws Exception if failed. + */ + public void testRemoteTxOnNearNodeIsStartedIfPartitionIsMoving() throws Exception { + startGridsMultiThreaded(3); + + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) + .setCacheMode(cacheMode()) + .setBackups(1) + ); + + ArrayList<Integer> keys = new ArrayList<>(); + + Affinity<Object> aff = grid(0).affinity(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(0).localNode(), i)) { + keys.add(i); + break; + } + } + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) { + keys.add(i); + break; + } + } + + assert keys.size() == 2; + + stopGrid(2); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(ImmutableMap.of( + keys.get(0), 0, + keys.get(1), 1) + ); + + tx.commit(); + } + + // assert transaction was committed without errors + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42ab0ebe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index 8585ebe..0146344 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccOperationChecks import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedCoordinatorFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccProcessorLazyStartTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccProcessorTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccRemoteTxOnNearNodeStartTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedCoordinatorFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentTransactionTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentTransactionTest; @@ -52,6 +53,8 @@ public class IgniteCacheMvccTestSuite extends TestSuite { suite.addTestSuite(DataStreamProcessorMvccSelfTest.class); suite.addTestSuite(CacheMvccOperationChecksTest.class); + suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class); + // Concurrent ops tests. suite.addTestSuite(CacheMvccIteratorWithConcurrentTransactionTest.class); suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentTransactionTest.class);
