This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 4593d6c11005ee5fcad70669e1ea9d8e0bafc7e5 Author: Anton Kalashnikov <[email protected]> AuthorDate: Thu Jan 30 14:08:00 2020 +0300 IGNITE-12593 Fix assertion during CacheDataTree update caused by byte array values and TTL - Fixes #7324. Signed-off-by: Alexey Goncharuk <[email protected]> --- .../processors/cache/GridCacheMapEntry.java | 2 + .../cache/distributed/dht/GridDhtCacheAdapter.java | 9 +- .../cache/transactions/IgniteTxManager.java | 39 ++-- .../IgnitePdsWithTtlDeactivateOnHighloadTest.java | 260 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 5 files changed, 295 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 1ad7eec..97d4fb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -6037,6 +6037,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme entry.deletedUnlocked(false); } } + else if (oldVal != null && entry.deletedUnlocked()) + entry.deletedUnlocked(false); CacheInvokeEntry<Object, Object> invokeEntry = null; IgniteBiTuple<Object, Exception> invokeRes = null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 139c67c..6a45ec5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import javax.cache.Cache; +import javax.cache.expiry.ExpiryPolicy; import java.io.Externalizable; import java.util.Collection; import java.util.Collections; @@ -28,8 +30,6 @@ import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.cache.Cache; -import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -232,6 +232,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheEntryEx entry; while (true) { + ctx.shared().database().checkpointReadLock(); + try { entry = ctx.dht().entryEx(k); @@ -278,6 +280,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap break; } + finally { + ctx.shared().database().checkpointReadUnlock(); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index ef849cd..f5e8d8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1149,31 +1149,38 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : tx.writeEntries(); for (IgniteTxEntry entry : entries) { - GridCacheEntryEx cached = entry.cached(); + cctx.database().checkpointReadLock(); - GridCacheContext cacheCtx = entry.context(); + try { + GridCacheEntryEx cached = entry.cached(); - if (cached == null) - cached = cacheCtx.cache().peekEx(entry.key()); + GridCacheContext cacheCtx = entry.context(); - if (cached.detached()) - continue; + if (cached == null) + cached = cacheCtx.cache().peekEx(entry.key()); - try { - if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion())) - cacheCtx.cache().removeEntry(cached); + if (cached.detached()) + continue; + + try { + if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion())) + cacheCtx.cache().removeEntry(cached); - if (!tx.near() && isNearEnabled(cacheCtx)) { - GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); + if (!tx.near() && isNearEnabled(cacheCtx)) { + GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); - GridNearCacheEntry e = near.peekExx(entry.key()); + GridNearCacheEntry e = near.peekExx(entry.key()); - if (e != null && e.markObsoleteIfEmpty(null)) - near.removeEntry(e); + if (e != null && e.markObsoleteIfEmpty(null)) + near.removeEntry(e); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to remove obsolete entry from cache: " + cached, e); } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to remove obsolete entry from cache: " + cached, e); + finally { + cctx.database().checkpointReadUnlock(); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java new file mode 100644 index 0000000..5e58e5b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java @@ -0,0 +1,260 @@ +package org.apache.ignite.internal.processors.cache.persistence.db; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import javax.cache.expiry.AccessedExpiryPolicy; +import javax.cache.expiry.Duration; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.NoOpFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.MvccFeatureChecker; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures; + +/** + * Test TTL worker with persistence enabled + */ +@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5") +public class IgnitePdsWithTtlDeactivateOnHighloadTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "expirable-cache"; + + /** */ + private static final String GROUP_NAME = "group1"; + + /** */ + private static final int PART_SIZE = 2; + + /** */ + private static final int EXPIRATION_TIMEOUT = 10; + + /** */ + private static final int ENTRIES = 10; + + /** */ + private static final int WORKLOAD_THREADS_CNT = 8; + + /** Fail. */ + private volatile boolean fail; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION); + + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(2L * 1024 * 1024 * 1024) + .setPersistenceEnabled(true) + ).setWalMode(WALMode.LOG_ONLY)); + + cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new NoOpFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + fail = true; + + return super.handle(ignite, failureCtx); + } + }; + } + + /** + * Returns a new cache configuration with the given name and {@code GROUP_NAME} group. + * + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration getCacheConfiguration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setGroupName(GROUP_NAME); + ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE)); + ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT))); + ccfg.setEagerTtl(true); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + + return ccfg; + } + + /** + * @throws Exception if failed. + */ + @Test + public void shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws Exception { + IgniteEx ig0 = startGrid(0); + + ig0.cluster().active(true); + + IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME); + + AtomicBoolean timeoutReached = new AtomicBoolean(false); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database(); + + IgniteInternalFuture ldrFut = runMultiThreadedAsync(() -> { + while (!timeoutReached.get()) { + Map map = new TreeMap(); + + for (int i = 0; i < ENTRIES; i++) + map.put(i, i); + + cache.putAll(map); + } + }, WORKLOAD_THREADS_CNT, "loader"); + + IgniteInternalFuture updaterFut = runMultiThreadedAsync(() -> { + while (!timeoutReached.get()) { + for (int i = 0; i < ENTRIES; i++) + cache.put(i, i * 10); + } + }, WORKLOAD_THREADS_CNT, "updater"); + + IgniteInternalFuture cpWriteLockUnlockFut = runAsync(() -> { + ReentrantReadWriteLock lock = U.field(db, "checkpointLock"); + + while (!timeoutReached.get()) { + try { + lock.writeLock().lockInterruptibly(); + + doSleep(30); + } + catch (InterruptedException ignored) { + break; + } + finally { + lock.writeLock().unlock(); + } + + doSleep(30); + } + }, "cp-write-lock-holder"); + + doSleep(10_000); + + timeoutReached.set(true); + + waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut); + } + + /** + * @throws Exception if failed. + */ + @Test + public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws Exception { + final AtomicBoolean end = new AtomicBoolean(); + + final IgniteEx srv = startGrid(0); + + srv.cluster().active(true); + + // Start high workload + IgniteInternalFuture loadFut = runMultiThreadedAsync(() -> { + while (!end.get() && !fail) { + IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME); + + for (int i = 0; i < ENTRIES; i++) + cache.put(i, new byte[1024]); + + //Touch entries. + for (int i = 0; i < ENTRIES; i++) + cache.get(i); // touch entries + } + }, WORKLOAD_THREADS_CNT, "high-workload"); + + try { + // Let's wait some time. + loadFut.get(10, TimeUnit.SECONDS); + } + catch (Exception e) { + assertFalse("Failure handler was called. See log above.", fail); + + assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class)); + } + finally { + end.set(true); + } + + assertFalse("Failure handler was called. See log above.", fail); + } + + /** */ + protected void fillCache(IgniteCache<Integer, byte[]> cache) { + for (int i = 0; i < ENTRIES; i++) + cache.put(i, new byte[1024]); + + //Touch entries. + for (int i = 0; i < ENTRIES; i++) + cache.get(i); // touch entries + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index dcd60b9..d281e6a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlDeactivateOnHighloadTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest; import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest; @@ -250,5 +251,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, FsyncWalRolloverDoesNotBlockTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsPartitionsStateRecoveryTest.class, ignoredTests); + + GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests); } }
