Repository: ignite Updated Branches: refs/heads/master 5135f82f3 -> 09002f2e0
IGNITE-7918 Huge memory leak when data streamer used together with local cache. - Fixes #3778. Signed-off-by: dspavlov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09002f2e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09002f2e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09002f2e Branch: refs/heads/master Commit: 09002f2e05629a1c71443ed5e135ea125f0e7722 Parents: 5135f82 Author: Andrei Aleksandrov <[email protected]> Authored: Sat Apr 28 19:07:55 2018 +0300 Committer: dspavlov <[email protected]> Committed: Sat Apr 28 19:07:55 2018 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 58 +++--- .../affinity/GridAffinityProcessor.java | 81 +++++++- .../affinity/AffinityHistoryCleanupTest.java | 86 ++++---- .../GridAffinityProcessorMemoryLeakTest.java | 202 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 5 files changed, 346 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index e420977..34e2b0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -122,9 +122,6 @@ public class GridAffinityAssignmentCache { /** Node stop flag. */ private volatile IgniteCheckedException stopErr; - /** History size ignoring client events changes. */ - private final AtomicInteger histSize = new AtomicInteger(); - /** Full history size. */ private final AtomicInteger fullHistSize = new AtomicInteger(); @@ -206,11 +203,13 @@ public class GridAffinityAssignmentCache { */ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; + assert idealAssignment != null; GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); - affCache.put(topVer, new HistoryAffinityAssignment(assignment)); + HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment)); + head.set(assignment); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -223,7 +222,9 @@ public class GridAffinityAssignmentCache { } } - onHistoryAdded(assignment); + // In case if value was replaced there is no sense to clean the history. + if (hAff == null) + onHistoryAdded(); if (log.isTraceEnabled()) { log.trace("New affinity assignment [grp=" + cacheOrGrpName @@ -273,6 +274,8 @@ public class GridAffinityAssignmentCache { affCache.clear(); + fullHistSize.set(0); + head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); stopErr = null; @@ -484,12 +487,14 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment aff = head.get(); - assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; - assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt; + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; + + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt; GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff); - affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy)); + HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy)); + head.set(assignmentCpy); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -502,7 +507,9 @@ public class GridAffinityAssignmentCache { } } - onHistoryAdded(assignmentCpy); + // In case if value was replaced there is no sense to clean the history. + if (hAff == null) + onHistoryAdded(); } /** @@ -779,27 +786,15 @@ public class GridAffinityAssignmentCache { } /** - * @param aff Added affinity assignment. + * Cleaning the affinity history. */ - private void onHistoryAdded(GridAffinityAssignment aff) { - int fullSize = fullHistSize.incrementAndGet(); - - int size; - - if (aff.clientEventChange()) - size = histSize.get(); - else - size = histSize.incrementAndGet(); - - int rmvCnt = size - MAX_HIST_SIZE; + private void onHistoryAdded() { + if (fullHistSize.incrementAndGet() > MAX_HIST_SIZE) { + Iterator<HistoryAffinityAssignment> it = affCache.values().iterator(); - if (rmvCnt <= 0) { - if (fullSize > MAX_HIST_SIZE * 2) - rmvCnt = MAX_HIST_SIZE; - } + int rmvCnt = MAX_HIST_SIZE / 2; - if (rmvCnt > 0) { - Iterator<HistoryAffinityAssignment> it = affCache.values().iterator(); + AffinityTopologyVersion topVerRmv = null; while (it.hasNext() && rmvCnt > 0) { AffinityAssignment aff0 = it.next(); @@ -808,11 +803,14 @@ public class GridAffinityAssignmentCache { rmvCnt--; - if (!aff0.clientEventChange()) - histSize.decrementAndGet(); - fullHistSize.decrementAndGet(); + + topVerRmv = aff0.topologyVersion(); } + + topVerRmv = it.hasNext() ? it.next().topologyVersion() : topVerRmv; + + ctx.affinity().removeCachedAffinity(topVerRmv); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 128eaf0..e26c0ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -26,9 +26,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; @@ -63,7 +64,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -86,8 +86,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** Time to wait between errors (in milliseconds). */ private static final long ERROR_WAIT = 500; + /** Log. */ + private final IgniteLogger log; + /** Affinity map. */ - private final ConcurrentMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentSkipListMap<>(); /** Listener. */ private final GridLocalEventListener lsnr = new GridLocalEventListener() { @@ -131,6 +134,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter { */ public GridAffinityProcessor(GridKernalContext ctx) { super(ctx); + + log = ctx.log(GridAffinityProcessor.class); } /** {@inheritDoc} */ @@ -212,6 +217,34 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null; } + /** + * Removes cached affinity instances with affinity topology versions less than {@code topVer}. + * + * @param topVer topology version. + */ + public void removeCachedAffinity(AffinityTopologyVersion topVer) { + assert topVer != null; + + int oldSize = affMap.size(); + + Iterator<Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>>> it = + affMap.headMap(new AffinityAssignmentKey(topVer)).entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> entry = it.next(); + + assert entry.getValue() != null; + + if (!entry.getValue().isDone()) + continue; + + it.remove(); + } + + if (log.isDebugEnabled()) + log.debug("Affinity cached values were cleared: " + (oldSize - affMap.size())); + } + /** * Maps keys to nodes for given cache. @@ -358,6 +391,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter { @SuppressWarnings("ErrorNotRethrown") @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer) throws IgniteCheckedException { + + assert cacheName != null; + AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); IgniteInternalFuture<AffinityInfo> fut = affMap.get(key); @@ -658,7 +694,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** * */ - private static class AffinityAssignmentKey { + private static class AffinityAssignmentKey implements Comparable<AffinityAssignmentKey> { /** */ private String cacheName; @@ -669,11 +705,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @param cacheName Cache name. * @param topVer Topology version. */ - private AffinityAssignmentKey(String cacheName, @NotNull AffinityTopologyVersion topVer) { + private AffinityAssignmentKey(@NotNull String cacheName, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } + /** + * Current constructor should be used only in removeCachedAffinity for creating of the special keys for removing. + * + * @param topVer Topology version. + */ + private AffinityAssignmentKey(@NotNull AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) @@ -700,6 +745,32 @@ public class GridAffinityProcessor extends GridProcessorAdapter { @Override public String toString() { return S.toString(AffinityAssignmentKey.class, this); } + + /** {@inheritDoc} */ + @Override public int compareTo(AffinityAssignmentKey o) { + assert o != null; + + if (this == o) + return 0; + + int res = this.topVer.compareTo(o.topVer); + + // Key with null cache name must be less than any key with not null cache name for the same topVer. + if (res == 0) { + if (cacheName == null && o.cacheName != null) + return -1; + + if (cacheName != null && o.cacheName == null) + return 1; + + if (cacheName == null && o.cacheName == null) + return 0; + + return cacheName.compareTo(o.cacheName); + } + + return res; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java index 605cc5f..f89d9ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java @@ -89,34 +89,34 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { Ignite ignite = startGrid(0); - checkHistory(ignite, F.asList(topVer(1, 0)), 1); + checkHistory(ignite, F.asList(topVer(1, 0)), 1); //fullHistSize = 1 startGrid(1); checkHistory(ignite, F.asList( - topVer(1, 0), - topVer(2, 0), - topVer(2, 1)), + topVer(1, 0), // FullHistSize = 1. + topVer(2, 0), // FullHistSize = 2. + topVer(2, 1)), // FullHistSize = 3. 3); startGrid(2); checkHistory(ignite, F.asList( - topVer(1, 0), - topVer(2, 0), - topVer(2, 1), - topVer(3, 0), - topVer(3, 1)), + topVer(1, 0), // FullHistSize = 1. + topVer(2, 0), // FullHistSize = 2. + topVer(2, 1), // FullHistSize = 3. + topVer(3, 0), // FullHistSize = 4. + topVer(3, 1)), // FullHistSize = 5. 5); startGrid(3); checkHistory(ignite, F.asList( - topVer(2, 1), - topVer(3, 0), - topVer(3, 1), - topVer(4, 0), - topVer(4, 1)), + topVer(2, 1), // FullHistSize = 3. + topVer(3, 0), // FullHistSize = 4. + topVer(3, 1), // FullHistSize = 5. + topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(4, 1)), // FullHistSize = 5. 5); client = true; @@ -126,13 +126,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { stopGrid(4); checkHistory(ignite, F.asList( - topVer(2, 1), - topVer(3, 0), - topVer(3, 1), - topVer(4, 0), - topVer(4, 1), - topVer(5, 0), - topVer(6, 0)), + topVer(3, 1), // FullHistSize = 5. + topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(4, 1), // FullHistSize = 5. + topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(6, 0)), // FullHistSize = 5. 5); startGrid(4); @@ -140,15 +138,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { stopGrid(4); checkHistory(ignite, F.asList( - topVer(2, 1), - topVer(3, 0), - topVer(3, 1), - topVer(4, 0), - topVer(4, 1), - topVer(5, 0), - topVer(6, 0), - topVer(7, 0), - topVer(8, 0)), + topVer(4, 1), // FullHistSize = 5. + topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(6, 0), // FullHistSize = 5. + topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(8, 0)), // FullHistSize = 5. 5); startGrid(4); @@ -156,28 +150,24 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { stopGrid(4); checkHistory(ignite, F.asList( - topVer(5, 0), - topVer(6, 0), - topVer(7, 0), - topVer(8, 0), - topVer(9, 0), - topVer(10, 0)), - 0); + topVer(6, 0), // FullHistSize = 5. + topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(8, 0), // FullHistSize = 5. + topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(10, 0)), // FullHistSize = 5. + 5); client = false; startGrid(4); checkHistory(ignite, F.asList( - topVer(5, 0), - topVer(6, 0), - topVer(7, 0), - topVer(8, 0), - topVer(9, 0), - topVer(10, 0), - topVer(11, 0), - topVer(11, 1)), - 2); + topVer(8, 0), // FullHistSize = 5. + topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(10, 0), // FullHistSize = 5. + topVer(11, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4. + topVer(11, 1)), // FullHistSize = 5. + 5); } finally { if (histProp != null) @@ -203,9 +193,9 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest { for (GridCacheContext cctx : proc.context().cacheContexts()) { GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff"); - AtomicInteger histSize = GridTestUtils.getFieldValue(aff, "histSize"); + AtomicInteger fullHistSize = GridTestUtils.getFieldValue(aff, "fullHistSize"); - assertEquals(expSize, histSize.get()); + assertEquals(expSize, fullHistSize.get()); Map<AffinityTopologyVersion, Object> cache = GridTestUtils.getFieldValue(aff, "affCache"); http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java new file mode 100644 index 0000000..3b6857d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.junits.common.GridCommonTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.getInteger; + +/** + * Tests for {@link GridAffinityProcessor}. + */ +@GridCommonTest(group = "Affinity Processor") +public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest { + /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */ + private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100); + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setForceServerMode(true); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(CACHE_NAME); + + cacheCfg.setStoreKeepBinary(true); + + cacheCfg.setCacheMode(CacheMode.LOCAL); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Test affinity functions caching and clean up. + * + * @throws Exception In case of any exception. + */ + public void testAffinityProcessor() throws Exception { + Ignite ignite = startGrid(0); + + IgniteKernal grid = (IgniteKernal)grid(0); + + IgniteCache<String, String> cache; + + IgniteCache<String, String> globalCache = getOrCreateGlobalCache(ignite); + + IgniteDataStreamer<String, String> globalStreamer; + + int count = MAX_HIST_SIZE * 4; + + int size; + + do { + try { + cache = createLocalCache(ignite, count); + + cache.put("Key" + count, "Value" + count); + + cache.destroy(); + + globalStreamer = createGlobalStreamer(ignite, globalCache); + + globalStreamer.addData("GlobalKey" + count, "GlobalValue" + count); + + globalStreamer.flush(); + + globalStreamer.close(); + + size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size(); + + assertTrue("Cache has size that bigger then expected [size=" + size + "" + + ", expLimit=" + MAX_HIST_SIZE * 3 + "]", size < MAX_HIST_SIZE * 3); + } + catch (Exception e) { + fail("Error was handled [" + e.getMessage() + "]"); + } + } + while (count-- > 0); + } + + /** + * Creates global cache. + * + * @param ignite instance of {@code Ignite}. + * @param id unique id for local cache. + * @return local cache instance. + */ + private static IgniteCache<String, String> createLocalCache(Ignite ignite, long id) { + final String cacheName = "localCache" + id; + + final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>(); + + cCfg.setName(cacheName); + + cCfg.setCacheMode(CacheMode.LOCAL); + + cCfg.setGroupName("some group"); + + ignite.destroyCache(cacheName); // Local cache is not really local - reference can be kept by other nodes if restart during the load happens. + + return ignite.createCache(cCfg).withKeepBinary(); + } + + /** + * Gets or creates global cache. + * + * @param ignite instance of {@code Ignite}. + * @return global cache instance. + */ + private static IgniteCache<String, String> getOrCreateGlobalCache(Ignite ignite) { + final String cacheName = "GlobalCache"; + + final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>(); + + cCfg.setName(cacheName); + + cCfg.setStoreKeepBinary(true); + + cCfg.setCacheMode(CacheMode.PARTITIONED); + + cCfg.setOnheapCacheEnabled(false); + + cCfg.setCopyOnRead(false); + + cCfg.setBackups(0); + + cCfg.setWriteBehindEnabled(false); + + cCfg.setReadThrough(false); + + return ignite.getOrCreateCache(cCfg).withKeepBinary(); + } + + /** + * Creates streamer for global cache. + * + * @param ignite instance of {@code Ignite}. + * @param cache instance of global cache. + * @return instance of {@code IgniteDataStreamer}. + */ + private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite, + IgniteCache<String, String> cache) { + IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName()); + + streamer.allowOverwrite(true); + + streamer.skipStore(true); + + streamer.keepBinary(false); + + return streamer; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index e71a569..cc93c5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest; import org.apache.ignite.internal.MarshallerContextLockingSelfTest; import org.apache.ignite.internal.TransactionsMXBeanImplTest; import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest; +import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorMemoryLeakTest; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest; import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest; import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest; @@ -133,6 +134,7 @@ public class IgniteBasicTestSuite extends TestSuite { GridTestUtils.addTestIfNeeded(suite, GridReleaseTypeSelfTest.class, ignoredTests); suite.addTestSuite(GridProductVersionSelfTest.class); suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class); + suite.addTestSuite(GridAffinityProcessorMemoryLeakTest.class); suite.addTestSuite(GridClosureProcessorSelfTest.class); suite.addTestSuite(GridClosureProcessorRemoteTest.class); suite.addTestSuite(GridClosureSerializationTest.class);
