ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1b2c03d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1b2c03d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1b2c03d Branch: refs/heads/ignite-5937 Commit: c1b2c03dc1ee9de222997cba4efcb2e5fb1a5885 Parents: c553638 Author: sboikov <sboi...@gridgain.com> Authored: Mon Oct 9 17:05:02 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Oct 9 17:50:53 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 285 +++++++------------ .../GridDhtPartitionsExchangeFuture.java | 2 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 32 ++- .../cache/mvcc/CacheMvccClusterRestartTest.java | 173 +++++++++++ 4 files changed, 308 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index dd4d7e0..80d36c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1356,15 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataRow; } - private int compare(CacheDataRow row, long crdVer, long mvccCntr) { - int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer); - - if (cmp != 0) - return cmp; - - return Long.compare(row.mvccCounter(), mvccCntr); - } - /** {@inheritDoc} */ @Override public GridLongList mvccRemove(GridCacheContext cctx, boolean primary, @@ -1376,9 +1367,67 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { + int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + CacheObjectContext coCtx = cctx.cacheObjectContext(); - return null; + // Make sure value bytes initialized. + key.valueBytes(coCtx); + + MvccUpdateRow updateRow = new MvccUpdateRow( + key, + null, + null, + mvccVer, + partId, + cacheId); + + rowStore.addRow(updateRow); + + assert updateRow.link() != 0 : updateRow; + + if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID) + updateRow.cacheId(cctx.cacheId()); + + GridLongList waitTxs = null; + + if (mvccVer.initialLoad()) { + boolean old = dataTree.putx(updateRow); + + assert !old; + + incrementSize(cctx.cacheId()); + } + else { + dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow); + + boolean old = dataTree.putx(updateRow); + + assert !old; + + if (!updateRow.previousNotNull()) + incrementSize(cctx.cacheId()); + + waitTxs = updateRow.activeTransactions(); + + List<CacheSearchRow> cleanupRows = updateRow.cleanupRows(); + + if (cleanupRows != null) { + for (int i = 0; i < cleanupRows.size(); i++) { + CacheSearchRow oldRow = cleanupRows.get(i); + + assert oldRow.link() != 0L : oldRow; + + boolean rmvd = dataTree.removex(oldRow); + + assert rmvd; + + rowStore.removeRow(oldRow.link()); + } + } + } + + return waitTxs; } finally { busyLock.leaveBusy(); @@ -1407,135 +1456,60 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager key.valueBytes(coCtx); val.valueBytes(coCtx); - if (true) { - MvccUpdateRow updateRow = new MvccUpdateRow( - key, - val, - ver, - mvccVer, - partId, - cacheId); - - rowStore.addRow(updateRow); - - assert updateRow.link() != 0 : updateRow; - - if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID) - updateRow.cacheId(cctx.cacheId()); - - GridLongList waitTxs = null; - - if (mvccVer.initialLoad()) { - boolean old = dataTree.putx(updateRow); - - assert !old; - - incrementSize(cctx.cacheId()); - } - else { - dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow); - - boolean old = dataTree.putx(updateRow); - - assert !old; - - if (!updateRow.previousNotNull()) - incrementSize(cctx.cacheId()); - - waitTxs = updateRow.activeTransactions(); + MvccUpdateRow updateRow = new MvccUpdateRow( + key, + val, + ver, + mvccVer, + partId, + cacheId); - List<CacheSearchRow> cleanupRows = updateRow.cleanupRows(); + rowStore.addRow(updateRow); - if (cleanupRows != null) { - for (int i = 0; i < cleanupRows.size(); i++) { - CacheSearchRow oldRow = cleanupRows.get(i); + assert updateRow.link() != 0 : updateRow; - assert oldRow.link() != 0L : oldRow; + if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID) + updateRow.cacheId(cctx.cacheId()); - boolean rmvd = dataTree.removex(oldRow); + GridLongList waitTxs = null; - assert rmvd; + if (mvccVer.initialLoad()) { + boolean old = dataTree.putx(updateRow); - rowStore.removeRow(oldRow.link()); - } - } - } + assert !old; - return waitTxs; + incrementSize(cctx.cacheId()); } else { - MvccDataRow dataRow = new MvccDataRow( - key, - val, - ver, - partId, - cacheId, - mvccVer.coordinatorVersion(), - mvccVer.counter()); - - rowStore.addRow(dataRow); + dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow); - assert dataRow.link() != 0 : dataRow; - - if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) - dataRow.cacheId(cctx.cacheId()); - - boolean old = dataTree.putx(dataRow); + boolean old = dataTree.putx(updateRow); assert !old; - GridLongList waitTxs = null; - - if (!mvccVer.initialLoad()) { - MvccLongList activeTxs = mvccVer.activeTransactions(); - - // TODO IGNITE-3484: need special method. - GridCursor<CacheDataRow> cur = dataTree.find( - new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), - new MvccSearchRow(cacheId, key, 1, 1)); - - boolean first = true; - - boolean activeTx = false; - - while (cur.next()) { - CacheDataRow oldVal = cur.get(); - - assert oldVal.link() != 0 : oldVal; - - if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && - activeTxs.contains(oldVal.mvccCounter())) { - if (waitTxs == null) - waitTxs = new GridLongList(); + if (!updateRow.previousNotNull()) + incrementSize(cctx.cacheId()); - assert oldVal.mvccCounter() != mvccVer.counter(); + waitTxs = updateRow.activeTransactions(); - waitTxs.add(oldVal.mvccCounter()); + List<CacheSearchRow> cleanupRows = updateRow.cleanupRows(); - activeTx = true; - } + if (cleanupRows != null) { + for (int i = 0; i < cleanupRows.size(); i++) { + CacheSearchRow oldRow = cleanupRows.get(i); - if (!activeTx) { - // Should not delete oldest version which is less than cleanup version. - int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + assert oldRow.link() != 0L : oldRow; - if (cmp <= 0) { - if (first) - first = false; - else { - boolean rmvd = dataTree.removex(oldVal); + boolean rmvd = dataTree.removex(oldRow); - assert rmvd; + assert rmvd; - rowStore.removeRow(oldVal.link()); - } - } - } + rowStore.removeRow(oldRow.link()); } } - - return waitTxs; } + + return waitTxs; } finally { busyLock.leaveBusy(); @@ -1746,26 +1720,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow row; if (grp.mvccEnabled()) { - if (true) { - MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key); - - dataTree.iterate( - searchRow, - new MvccKeyMinVersionBound(cacheId, key), - searchRow // Use the same instance as closure to do not create extra object. - ); + MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key); - row = searchRow.row(); - } - else { - GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), - new MvccSearchRow(cacheId, key, 1, 1)); + dataTree.iterate( + searchRow, + new MvccKeyMinVersionBound(cacheId, key), + searchRow // Use the same instance as closure to do not create extra object. + ); - if (cur.next()) - row = cur.get(); - else - row = null; - } + row = searchRow.row(); } else row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); @@ -1818,55 +1781,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - if (true) { - MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver); - - dataTree.iterate( - lower, - new MvccKeyMinVersionBound(cacheId, key), - lower // Use the same instance as closure to do not create extra object. - ); - - CacheDataRow row = lower.row(); - - afterRowFound(row, key); - - return row; - } - else { - GridCursor<CacheDataRow> cur = dataTree.find( - new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()), - new MvccSearchRow(cacheId, key, 1, 1)); - - CacheDataRow row = null; + MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver); - MvccLongList txs = ver.activeTransactions(); + dataTree.iterate( + lower, + new MvccKeyMinVersionBound(cacheId, key), + lower // Use the same instance as closure to do not create extra object. + ); - while (cur.next()) { - CacheDataRow row0 = cur.get(); + CacheDataRow row = lower.row(); - assert row0.mvccCoordinatorVersion() > 0 : row0; - - boolean visible; - - if (txs != null) { - visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() - || !txs.contains(row0.mvccCounter()); - } - else - visible = true; - - if (visible) { - row = row0; - - break; - } - } - - assert row == null || key.equals(row.key()); + afterRowFound(row, key); - return row; - } + return row; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 830d50b..88095ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -560,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator(); boolean mvccCrdChange = mvccCrd != null && - initialVersion().equals(mvccCrd.topologyVersion()); + (initialVersion().equals(mvccCrd.topologyVersion()) || activateCluster()); cctx.kernalContext().coordinators().currentCoordinator(mvccCrd); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index b9b8ea1..54fb3c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -66,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; /** @@ -86,7 +87,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** */ private static final byte MSG_POLICY = SYSTEM_POOL; - + + /** */ + private static final long CRD_VER_MASK = 0x3F_FF_FF_FF_FF_FF_FF_FFL; + + /** */ + private static final long RMVD_VAL_VER_MASK = 0x80_00_00_00_00_00_00_00L; + /** */ private volatile MvccCoordinator curCrd; @@ -139,6 +146,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { super(ctx); } + public static int compareCoordinatorVersions(long crdVer1, long crdVer2) { + crdVer1 = CRD_VER_MASK & crdVer1; + crdVer2 = CRD_VER_MASK & crdVer2; + + return Long.compare(crdVer1, crdVer2); + } + + public long createVersionForRemovedValue(long crdVer) { + return crdVer | RMVD_VAL_VER_MASK; + } + + public boolean versionForRemovedValue(long crdVer) { + return (crdVer & RMVD_VAL_VER_MASK) != 0; + } + /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { statCntrs = new StatCounter[7]; @@ -199,7 +221,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param topVer Topology version. */ public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { - if (evtType == EVT_NODE_METRICS_UPDATED) + if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT) return; MvccCoordinator crd; @@ -778,7 +800,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { assert crdVer != 0; - return activeQueries.assignQueryCounter(qryNodeId, futId); + MvccCoordinatorVersionResponse res = activeQueries.assignQueryCounter(qryNodeId, futId); + + return res; // MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); // @@ -989,7 +1013,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); - crdVer = topVer.topologyVersion(); + crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime(); prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java new file mode 100644 index 0000000..ed7b62d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheMvccClusterRestartTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setConsistentId(gridName); + + cfg.setMvccEnabled(true); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setPageSize(1024); + memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024); + + cfg.setMemoryConfiguration(memCfg); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + GridTestUtils.deleteDbFiles(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + GridTestUtils.deleteDbFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridTestUtils.deleteDbFiles(); + + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testRestart1() throws Exception { + restart1(3, 3); + } + + /** + * @throws Exception If failed. + */ + public void testRestart2() throws Exception { + restart1(1, 3); + } + + /** + * @throws Exception If failed. + */ + public void testRestart3() throws Exception { + restart1(3, 1); + } + + /** + * @param srvBefore Number of servers before restart. + * @param srvAfter Number of servers after restart. + * @throws Exception If failed. + */ + private void restart1(int srvBefore, int srvAfter) throws Exception { + Ignite srv0 = startGridsMultiThreaded(srvBefore); + + srv0.active(true); + + IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration()); + + Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0)); + + try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Integer k : keys) + cache.put(k, k); + + tx.commit(); + } + + stopAllGrids(); + + srv0 = startGridsMultiThreaded(srvAfter); + + srv0.active(true); + + cache = srv0.cache(DEFAULT_CACHE_NAME); + + Map<Object, Object> res = cache.getAll(keys); + + assertEquals(keys.size(), res.size()); + + for (Integer k : keys) + assertEquals(k, cache.get(k)); + + try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Integer k : keys) + cache.put(k, k + 1); + + tx.commit(); + } + + for (Integer k : keys) + assertEquals(k + 1, cache.get(k)); + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration() { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(2); + + return ccfg; + } +}