This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 3873693 IGNITE-9739 don't write non-baseline nodes to wal TxRecord 3873693 is described below commit 3873693d7851c68f0763bddee64c6bbc819dde58 Author: Sergey Kosarev <skosa...@gridgain.com> AuthorDate: Wed Jan 16 18:43:38 2019 +0300 IGNITE-9739 don't write non-baseline nodes to wal TxRecord Signed-off-by: Andrey Gura <ag...@apache.org> --- .../managers/discovery/ConsistentIdMapper.java | 10 +- .../cache/transactions/IgniteTxManager.java | 28 +++-- .../distributed/CacheBaselineTopologyTest.java | 113 ++++++++++++++++++++- 3 files changed, 135 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java index 59f773d..ac72afe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java @@ -99,8 +99,6 @@ public class ConsistentIdMapper { if (txNodes == null) return null; - Map<Object, Short> constIdMap = baselineTop.consistentIdMapping(); - Map<UUID, Short> m = discoveryMgr.consistentId(topVer); int bltNodes = m.size(); @@ -112,15 +110,19 @@ public class ConsistentIdMapper { for (Map.Entry<UUID, Collection<UUID>> e : txNodes.entrySet()) { UUID node = e.getKey(); + if (!m.containsKey(node)) // not in blt + continue; + Collection<UUID> backupNodes = e.getValue(); Collection<Short> backups = new ArrayList<>(backupNodes.size()); for (UUID backup : backupNodes) { - if (m.containsKey(backup)) + if (m.containsKey(backup)) { nodeCnt++; - backups.add(mapToCompactId(topVer, backup)); + backups.add(mapToCompactId(topVer, backup)); + } } // Optimization for short store full nodes set. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index ce914e8..e55676c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2476,13 +2476,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (cctx.wal() != null && logTxRecords) { TxRecord txRecord = newTxRecord(tx); - try { - return cctx.wal().log(txRecord); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to log TxRecord: " + txRecord, e); + if (txRecord != null) { + try { + return cctx.wal().log(txRecord); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to log TxRecord: " + txRecord, e); - throw new IgniteException("Failed to log TxRecord: " + txRecord, e); + throw new IgniteException("Failed to log TxRecord: " + txRecord, e); + } } } @@ -2498,12 +2500,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private TxRecord newTxRecord(IgniteTxAdapter tx) { BaselineTopology baselineTop = cctx.kernalContext().state().clusterState().baselineTopology(); - Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop); + if (baselineTop != null && baselineTop.consistentIds().contains(cctx.localNode().consistentId())) { + Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop); - if (tx.txState().mvccEnabled()) - return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot()); - else - return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes); + if (tx.txState().mvccEnabled()) + return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot()); + else + return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes); + } + + return null; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 2727350..e2cef92 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -47,17 +48,21 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; @@ -103,6 +108,8 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { client = false; disableAutoActivation = false; + + System.clearProperty(IGNITE_WAL_LOG_TX_RECORDS); } /** {@inheritDoc} */ @@ -834,6 +841,110 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + @Test + public void testNotMapNonBaselineTxPrimaryNodes() throws Exception { + checkNotMapNonBaselineTxNodes(true, false); + } + + /** + * + * @throws Exception If failed. + */ + @Test + public void testNotMapNonBaselineTxBackupNodes() throws Exception { + checkNotMapNonBaselineTxNodes(false, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testNotMapNonBaselineNearTxPrimaryNodes() throws Exception { + checkNotMapNonBaselineTxNodes(true, true); + } + + /** + * + * @throws Exception If failed. + */ + @Test + public void testNotMapNonBaselineNearTxBackupNodes() throws Exception { + checkNotMapNonBaselineTxNodes(false, true); + } + + /** + * @param primary Whether non-baseline node is primary. + * @param near Whether non-baseline nod is near node. + * @throws Exception If failed. + */ + public void checkNotMapNonBaselineTxNodes(boolean primary, boolean near) throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true"); + + int bltNodesCnt = 3; + + Ignite ig = startGrids(bltNodesCnt); + + ig.cluster().active(true); + + ig.createCache(new CacheConfiguration<>() + .setName(CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(2)); + + ig.createCache( + new CacheConfiguration<>() + .setName(CACHE_NAME + 1) + .setDataRegionName("memory") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(2) + ); + + Ignite nonBltIgnite = startGrid(bltNodesCnt); + + awaitPartitionMapExchange(); + + ClusterNode nonBltNode = nonBltIgnite.cluster().localNode(); + + Ignite nearIgnite = near ? nonBltIgnite : ig; + + IgniteCache<Integer, Integer> persistentCache = nearIgnite.cache(CACHE_NAME); + + IgniteCache<Integer, Integer> inMemoryCache = nearIgnite.cache(CACHE_NAME + 1); + + assertEquals(0, nearIgnite.affinity(persistentCache.getName()).allPartitions(nonBltNode).length); + + assertTrue(nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length > 0); + + ClusterNode nearNode = nearIgnite.cluster().localNode(); + + try (Transaction tx = nearIgnite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED)) { + for (int i = 0; ; i++) { + List<ClusterNode> nodes = new ArrayList<>(nearIgnite.affinity(inMemoryCache.getName()) + .mapKeyToPrimaryAndBackups(i)); + + ClusterNode primaryNode = nodes.get(0); + + List<ClusterNode> backupNodes = nodes.subList(1, nodes.size()); + + if (nonBltNode.equals(primaryNode) == primary) { + if (backupNodes.contains(nonBltNode) != primary) { + inMemoryCache.put(i, i); + + // add some persistent data in the same transaction + for (int j = 0; j < 100; j++) + persistentCache.put(j, j); + + break; + } + } + } + tx.commit(); + } + } + + /** * @throws Exception if failed. */ @Test