Repository: ignite Updated Branches: refs/heads/master f57b807bb -> 88df5ee60
IGNITE-9446: MVCC: improved handling of vacuum errors caused by concurrent changes to cache or node stop. This closes #4940. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88df5ee6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88df5ee6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88df5ee6 Branch: refs/heads/master Commit: 88df5ee607b23a30e051e9724dcd97b7e43aa6d6 Parents: f57b807 Author: rkondakov <[email protected]> Authored: Thu Oct 11 16:11:52 2018 +0300 Committer: devozerov <[email protected]> Committed: Thu Oct 11 16:11:52 2018 +0300 ---------------------------------------------------------------------- .../cache/mvcc/MvccProcessorImpl.java | 58 +++++++++++++++++++- .../cache/mvcc/CacheMvccAbstractTest.java | 7 ++- .../mvcc/CacheMvccSqlQueriesAbstractTest.java | 2 - 3 files changed, 59 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/88df5ee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 3c45963..40d4ac9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -2078,9 +2078,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce VacuumMetrics metrics = new VacuumMetrics(); - if (part == null || part.state() != OWNING || !part.reserve()) + if (!canRunVacuum(part, null) || !part.reserve()) return metrics; + int curCacheId = CU.UNDEFINED_CACHE_ID; + try { GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(KEY_ONLY); @@ -2094,8 +2096,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce GridCacheContext cctx = null; - int curCacheId = CU.UNDEFINED_CACHE_ID; - boolean shared = part.group().sharedGroup(); if (!shared && (cctx = F.first(part.group().caches())) == null) @@ -2150,11 +2150,63 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce return metrics; } + catch (Exception e) { + if (canRunVacuum(part, curCacheId)) + throw e; // Unexpected error. + + U.warn(log, "Error occurred during the vacuum. Skip vacuuming for the current partition. " + + "[cacheId=" + curCacheId + ", part=" + part + ", err=" + e.getMessage() + ']', e); + + return new VacuumMetrics(); + } finally { part.release(); } } + /** + * @param part Partition. + * @param cacheId Cache id. + * @return {@code True} if we can vacuum given partition. + */ + private boolean canRunVacuum(GridDhtLocalPartition part, Integer cacheId) { + if (part == null || part.state() != OWNING) + return false; + + CacheGroupContext grp = part.group(); + + assert grp != null; + + List<GridCacheContext> caches = grp.caches(); + + if (F.isEmpty(caches)) + return false; + + if (grp.shared().kernalContext().isStopping()) + return false; + + if (cacheId == null && grp.sharedGroup()) + return true; // Cache context is unknown, but we can try to run vacuum. + + GridCacheContext ctx0; + + if (grp.sharedGroup()) { + assert cacheId != null && cacheId != CU.UNDEFINED_CACHE_ID; + + if (!grp.cacheIds().contains(cacheId)) + return false; + + ctx0 = grp.shared().cacheContext(cacheId); + } + else + ctx0 = caches.get(0); + + if (ctx0 == null) + return false; + + return !grp.shared().closed(ctx0); + } + /** */ @SuppressWarnings("unchecked") @NotNull private Object addRest(@Nullable Object rest, MvccDataRow row) { http://git-wip-us.apache.org/repos/asf/ignite/blob/88df5ee6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index d75b8e0..ec6b78a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -228,8 +228,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { persistence = false; try { - if(disableScheduledVacuum) - verifyOldVersionsCleaned(); + verifyOldVersionsCleaned(); verifyCoordinatorInternalState(); } @@ -1535,6 +1534,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { protected void verifyOldVersionsCleaned() throws Exception { runVacuumSync(); + awaitPartitionMapExchange(); + // Check versions. boolean cleaned = checkOldVersions(false); @@ -1559,7 +1560,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { for (IgniteCacheProxy cache : ((IgniteKernal)node).caches()) { GridCacheContext cctx = cache.context(); - if (!cctx.userCache() || !cctx.group().mvccEnabled()) + if (!cctx.userCache() || !cctx.group().mvccEnabled() || F.isEmpty(cctx.group().caches()) || cctx.shared().closed(cctx)) continue; for (Iterator it = cache.withKeepBinary().iterator(); it.hasNext(); ) { http://git-wip-us.apache.org/repos/asf/ignite/blob/88df5ee6/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java index bcbfbc2..313b58f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java @@ -633,8 +633,6 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT * @throws Exception If failed. */ public void testDistributedJoinSimple() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9446"); - startGridsMultiThreaded(4); Ignite srv0 = ignite(0);
