Repository: ignite Updated Branches: refs/heads/master 66755c1a2 -> bbcf5b400
IGNITE-9972 Fixed memory leak of deleted entries in partition map - Fixes #5196. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbcf5b40 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbcf5b40 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbcf5b40 Branch: refs/heads/master Commit: bbcf5b4000112148d727a42eead0195cd7606a88 Parents: 66755c1 Author: ibessonov <bessonov...@gmail.com> Authored: Fri Nov 2 12:45:02 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Nov 2 13:54:22 2018 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 9 +- .../processors/cache/GridCacheMapEntry.java | 50 +++++- .../distributed/GridDistributedCacheEntry.java | 18 +- .../dht/topology/GridDhtLocalPartition.java | 10 +- .../cache/local/GridLocalCacheEntry.java | 22 ++- .../ignite/internal/worker/WorkersRegistry.java | 2 +- .../IgniteTxConcurrentRemoveObjectsTest.java | 165 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite9.java | 3 + 8 files changed, 245 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index d28d842..5c91dc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -50,9 +50,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.internal.binary.BinaryContext; -import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; @@ -60,6 +58,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.CachePluginConfiguration; +import org.apache.ignite.spi.encryption.EncryptionSpi; +import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.jetbrains.annotations.Nullable; /** @@ -317,7 +317,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private long rebalanceThrottle = DFLT_REBALANCE_THROTTLE; /** */ - private CacheInterceptor<?, ?> interceptor; + private CacheInterceptor<K, V> interceptor; /** */ private Class<?>[] sqlFuncCls; @@ -1627,9 +1627,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * * @return Cache interceptor. */ - @SuppressWarnings({"unchecked"}) @Nullable public CacheInterceptor<K, V> getInterceptor() { - return (CacheInterceptor<K, V>)interceptor; + return interceptor; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4c03663..cc0a78e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -139,7 +139,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * NOTE + * <br/> * ==== + * <br/> * Make sure to recalculate this value any time when adding or removing fields from entry. * The size should be count as follows: * <ul> @@ -147,9 +149,45 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * <li>References: 8 each</li> * <li>Each nested object should be analyzed in the same way as above.</li> * </ul> + * ==== + * <br/> + * <ul> + * <li>Reference fields:<ul> + * <li>8 : {@link #cctx}</li> + * <li>8 : {@link #key}</li> + * <li>8 : {@link #val}</li> + * <li>8 : {@link #ver}</li> + * <li>8 : {@link #extras}</li> + * <li>8 : {@link #lock}</li> + * <li>8 : {@link #listenerLock}</li> + * <li>8 : {@link GridMetadataAwareAdapter#data}</li> + * </ul></li> + * <li>Primitive fields:<ul> + * <li>4 : {@link #hash}</li> + * <li>1 : {@link #flags}</li> + * </ul></li> + * <li>Extras:<ul> + * <li>8 : {@link GridCacheEntryExtras#ttl()}</li> + * <li>8 : {@link GridCacheEntryExtras#expireTime()}</li> + * </ul></li> + * <li>Version:<ul> + * <li>4 : {@link GridCacheVersion#topVer}</li> + * <li>4 : {@link GridCacheVersion#nodeOrderDrId}</li> + * <li>8 : {@link GridCacheVersion#order}</li> + * </ul></li> + * <li>Key:<ul> + * <li>8 : {@link CacheObjectAdapter#val}</li> + * <li>8 : {@link CacheObjectAdapter#valBytes}</li> + * <li>4 : {@link KeyCacheObjectImpl#part}</li> + * </ul></li> + * <li>Value:<ul> + * <li>8 : {@link CacheObjectAdapter#val}</li> + * <li>8 : {@link CacheObjectAdapter#valBytes}</li> + * </ul></li> + * </ul> */ - // 7 * 8 /*references*/ + 2 * 8 /*long*/ + 1 * 4 /*int*/ + 1 * 1 /*byte*/ + array at parent = 85 - private static final int SIZE_OVERHEAD = 85 /*entry*/ + 32 /* version */ + 4 * 7 /* key + val */; + private static final int SIZE_OVERHEAD = 8 * 8 /* references */ + 5 /* primitives */ + 16 /* extras */ + + 16 /* version */ + 20 /* key */ + 16 /* value */; /** Static logger to avoid re-creation. Made static for test purpose. */ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -1828,9 +1866,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme unlockListenerReadLock(); } - if (deferred) - cctx.onDeferredDelete(this, newVer); - if (marked) { assert !deferred; @@ -4893,6 +4928,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ + @Override public void txUnlock(IgniteInternalTx tx) throws GridCacheEntryRemovedException { + removeLock(tx.xidVersion()); + } + + /** {@inheritDoc} */ @Override public void onUnlock() { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index 96f0a55..6dc021b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -368,6 +368,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { GridCacheMvccCandidate doomed; + GridCacheVersion deferredDelVer; + CacheObject val; lockEntry(); @@ -406,11 +408,22 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } val = this.val; + + deferredDelVer = this.ver; } finally { unlockEntry(); } + if (val == null) { + boolean deferred = cctx.deferredDelete() && !detached() && !isInternal(); + + if (deferred) { + if (deferredDelVer != null) + cctx.onDeferredDelete(this, deferredDelVer); + } + } + if (log.isDebugEnabled()) log.debug("Removed lock candidate from entry [doomed=" + doomed + ", owner=" + owner + ", prev=" + prev + ", entry=" + this + ']'); @@ -705,11 +718,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } } - /** {@inheritDoc} */ - @Override public final void txUnlock(IgniteInternalTx tx) throws GridCacheEntryRemovedException { - removeLock(tx.xidVersion()); - } - /** * @param emptyBefore Empty flag before operation. * @param emptyAfter Empty flag after operation. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index e58c85f..9151159 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -79,15 +79,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo */ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable { /** */ - private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtCacheEntry(ctx, topVer, key); - } - }; + private static final GridCacheMapEntryFactory ENTRY_FACTORY = GridDhtCacheEntry::new; /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 412d4d3..e26174a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -258,15 +258,6 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { } /** - * Unlocks lock if it is currently owned. - * - * @param tx Transaction to unlock. - */ - @Override public void txUnlock(IgniteInternalTx tx) throws GridCacheEntryRemovedException { - removeLock(tx.xidVersion()); - } - - /** * Releases local lock. */ void releaseLocal() { @@ -327,6 +318,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { GridCacheMvccCandidate doomed; + GridCacheVersion deferredDelVer; + lockEntry(); try { @@ -351,11 +344,22 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { } val = this.val; + + deferredDelVer = this.ver; } finally { unlockEntry(); } + if (val == null) { + boolean deferred = cctx.deferredDelete() && !detached() && !isInternal(); + + if (deferred) { + if (deferredDelVer != null) + cctx.onDeferredDelete(this, deferredDelVer); + } + } + if (doomed != null) checkThreadChain(doomed); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java index 153b289..3d45f3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java @@ -212,7 +212,7 @@ public class WorkersRegistry implements GridWorkerListener { // That is, if worker is dead, but still resides in registeredWorkers // then something went wrong, the only extra thing is to test // whether the iterator refers to actual state of registeredWorkers. - GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); + GridWorker worker0 = registeredWorkers.get(runner.getName()); if (worker0 != null && worker0 == worker) workerFailedHnd.apply(worker, SYSTEM_WORKER_TERMINATION); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConcurrentRemoveObjectsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConcurrentRemoveObjectsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConcurrentRemoveObjectsTest.java new file mode 100644 index 0000000..a5b2cb7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConcurrentRemoveObjectsTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.UUID; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class IgniteTxConcurrentRemoveObjectsTest extends GridCommonAbstractTest { + /** Cache partitions. */ + private static final int CACHE_PARTITIONS = 16; + + /** Cache entries count. */ + private static final int CACHE_ENTRIES_COUNT = 512 * CACHE_PARTITIONS; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** New value for {@link IgniteSystemProperties#IGNITE_CACHE_REMOVED_ENTRIES_TTL} property. */ + private static final long newIgniteCacheRemovedEntriesTtl = 50L; + + /** Old value of {@link IgniteSystemProperties#IGNITE_CACHE_REMOVED_ENTRIES_TTL} property. */ + private static long oldIgniteCacheRmvEntriesTtl; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + oldIgniteCacheRmvEntriesTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000); + + System.setProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL, Long.toString(newIgniteCacheRemovedEntriesTtl)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.setProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL, Long.toString(oldIgniteCacheRmvEntriesTtl)); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration<Integer, String> getCacheCfg() { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(); + + ccfg.setName(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + ccfg.setAffinity(new RendezvousAffinityFunction().setPartitions(CACHE_PARTITIONS)); + + return ccfg; + } + + /** + * Too many deletes in single transaction may overflow {@link GridDhtLocalPartition#rmvQueue} and entries will be + * deleted synchronously in {@link GridDhtLocalPartition#onDeferredDelete(int, KeyCacheObject, GridCacheVersion)}. + * This should not corrupt internal map state in {@link GridDhtLocalPartition}. + * + * @throws Exception If failed. + */ + public void testTxLeavesObjectsInLocalPartition() throws Exception { + IgniteEx igniteEx = startGrid(getConfiguration()); + + igniteEx.getOrCreateCache(getCacheCfg()); + + try (IgniteDataStreamer<Integer, String> dataStreamer = igniteEx.dataStreamer(DEFAULT_CACHE_NAME)) { + for (int i = 0; i < CACHE_ENTRIES_COUNT; i++) + dataStreamer.addData(i, UUID.randomUUID().toString()); + } + + IgniteEx client = startGrid( + getConfiguration() + .setClientMode(true) + .setIgniteInstanceName(UUID.randomUUID().toString()) + ); + + awaitPartitionMapExchange(); + + assertEquals(CACHE_ENTRIES_COUNT, client.getOrCreateCache(DEFAULT_CACHE_NAME).size()); + + try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + IgniteCache<Integer, String> cache = client.getOrCreateCache(getCacheCfg()); + + for (int v = 0; v < CACHE_ENTRIES_COUNT; v++) { + cache.get(v); + + cache.remove(v); + } + + tx.commit(); + } + + GridTestUtils.waitForCondition( + () -> igniteEx.context().cache().cacheGroups().stream() + .filter(CacheGroupContext::userCache) + .flatMap(cgctx -> cgctx.topology().localPartitions().stream()) + .mapToInt(GridDhtLocalPartition::internalSize) + .max().orElse(-1) == 0, + newIgniteCacheRemovedEntriesTtl * 10 + ); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bbcf5b40/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java index 173c555..37e3a77 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsIn import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteTxConcurrentRemoveObjectsTest; import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest; import org.apache.ignite.testframework.junits.GridAbstractTest; @@ -51,6 +52,8 @@ public class IgniteCacheTestSuite9 extends TestSuite { suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class); + suite.addTestSuite(IgniteTxConcurrentRemoveObjectsTest.class); + suite.addTestSuite(TxDataConsistencyOnCommitFailureTest.class); suite.addTestSuite(CacheOperationsInterruptTest.class);