This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-18540 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit a813e047f8b091ceaba6820501e6bde9e79274a6 Author: tledkov <[email protected]> AuthorDate: Fri Apr 26 19:54:09 2019 +0300 GG-17434 Fix memory leak on unstable topology caused by partition reservation --- .../query/h2/twostep/PartitionReservationKey.java | 13 ++ .../h2/twostep/PartitionReservationManager.java | 73 +++++-- .../query/MemLeakOnSqlWithClientReconnectTest.java | 218 +++++++++++++++++++++ .../testsuites/IgniteCacheQuerySelfTestSuite6.java | 4 +- 4 files changed, 290 insertions(+), 18 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java index 73984bc..d02b6b8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; /** * Partition reservation key. @@ -47,6 +48,13 @@ public class PartitionReservationKey { return cacheName; } + /** + * @return Topology version of reservation. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) @@ -69,4 +77,9 @@ public class PartitionReservationKey { return res; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PartitionReservationKey.class, this); + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java index e91d7d5..89d7a01 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java @@ -16,13 +16,24 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; @@ -30,14 +41,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; @@ -45,19 +48,24 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; /** - * Class responsible for partition reservation for queries executed on local node. - * Prevents partitions from being eveicted from node during query execution. + * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being + * evicted from node during query execution. */ -public class PartitionReservationManager { +public class PartitionReservationManager implements PartitionsExchangeAware { /** Special instance of reservable object for REPLICATED caches. */ private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable(); /** Kernal context. */ private final GridKernalContext ctx; - /** Reservations. */ + /** Group reservations cache. When affinity version is not changed and all primary partitions must be reserved + * we get group reservation from this map instead of create new reservation group. + */ private final ConcurrentMap<PartitionReservationKey, GridReservable> reservations = new ConcurrentHashMap<>(); + /** Logger. */ + private final IgniteLogger log; + /** * Constructor. * @@ -65,11 +73,15 @@ public class PartitionReservationManager { */ public PartitionReservationManager(GridKernalContext ctx) { this.ctx = ctx; + + log = ctx.log(PartitionReservationManager.class); + + ctx.cache().context().exchange().registerExchangeAwareComponent(this); } /** * @param cacheIds Cache IDs. - * @param topVer Topology version. + * @param reqTopVer Topology version from request. * @param explicitParts Explicit partitions list. * @param nodeId Node ID. * @param reqId Request ID. @@ -78,18 +90,18 @@ public class PartitionReservationManager { */ public PartitionReservation reservePartitions( @Nullable List<Integer> cacheIds, - AffinityTopologyVersion topVer, + AffinityTopologyVersion reqTopVer, final int[] explicitParts, UUID nodeId, long reqId ) throws IgniteCheckedException { - assert topVer != null; + assert reqTopVer != null; + + AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); if (F.isEmpty(cacheIds)) return new PartitionReservation(Collections.emptyList()); - List<GridReservable> reserved = new ArrayList<>(); - Collection<Integer> partIds; if (explicitParts == null) @@ -103,6 +115,8 @@ public class PartitionReservationManager { partIds.add(explicitPart); } + List<GridReservable> reserved = new ArrayList<>(); + for (int i = 0; i < cacheIds.size(); i++) { GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); @@ -308,6 +322,31 @@ public class PartitionReservationManager { } /** + * Cleanup group reservations cache on change affinity version. + */ + @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) { + try { + // Must not do anything at the exchange thread. Dispatch to the management thread pool. + ctx.closure().runLocal(() -> { + AffinityTopologyVersion topVer = ctx.cache().context().exchange() + .lastAffinityChangedTopologyVersion(fut.topologyVersion()); + + reservations.forEach((key, r) -> { + if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) { + assert r instanceof GridDhtPartitionsReservation; + + ((GridDhtPartitionsReservation)r).invalidate(); + } + }); + }, + GridIoPolicy.MANAGEMENT_POOL); + } + catch (Throwable e) { + log.error("Unexpected exception on start reservations cleanup", e); + } + } + + /** * Mapper fake reservation object for replicated caches. */ private static class ReplicatedReservable implements GridReservable { diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java new file mode 100644 index 0000000..ad56e11 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java @@ -0,0 +1,218 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +/** + * Tests for group reservation leaks at the PartitionReservationManager on unstable topology. + */ +public class MemLeakOnSqlWithClientReconnectTest extends AbstractIndexingCommonTest { + /** Keys count. */ + private static final int KEY_CNT = 10; + + /** Keys count. */ + private static final int ITERS = 2000; + + /** Replicated cache schema name. */ + private static final String REPL_SCHEMA = "REPL"; + + /** Partitioned cache schema name. */ + private static final String PART_SCHEMA = "PART"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.startsWith("cli")) + cfg.setClientMode(true).setGridLogger(new NullLogger()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrid(); + + IgniteCache<Long, Long> partCache = grid().createCache(new CacheConfiguration<Long, Long>() + .setName("part") + .setSqlSchema("PART") + .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) + .setTableName("test") + .addQueryField("id", Long.class.getName(), null) + .addQueryField("val", Long.class.getName(), null) + .setKeyFieldName("id") + .setValueFieldName("val") + )) + .setAffinity(new RendezvousAffinityFunction(false, 10))); + + IgniteCache<Long, Long> replCache = grid().createCache(new CacheConfiguration<Long, Long>() + .setName("repl") + .setSqlSchema("REPL") + .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) + .setTableName("test") + .addQueryField("id", Long.class.getName(), null) + .addQueryField("val", Long.class.getName(), null) + .setKeyFieldName("id") + .setValueFieldName("val"))) + .setCacheMode(CacheMode.REPLICATED)); + + for (long i = 0; i < KEY_CNT; ++i) { + partCache.put(i, i); + replCache.put(i, i); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Test partition group reservation leaks on partitioned cache. + * + * @throws Exception On error. + */ + @Test + public void testPartitioned() throws Exception { + checkReservationLeak(false); + } + + /** + * Test partition group reservation leaks on replicated cache. + * + * @throws Exception On error. + */ + @Test + public void testReplicated() throws Exception { + checkReservationLeak(true); + } + + /** + * Check partition group reservation leaks. + * + * @param replicated Flag to run query on partitioned or replicated cache. + * @throws Exception On error. + */ + private void checkReservationLeak(boolean replicated) throws Exception { + final AtomicInteger cliNum = new AtomicInteger(); + final AtomicBoolean end = new AtomicBoolean(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> { + String name = "cli_" + cliNum.getAndIncrement(); + + while (!end.get()) { + try { + startGrid(name); + + U.sleep(10); + + stopGrid(name); + } + catch (Exception e) { + fail("Unexpected exception on start test client node"); + } + } + }, + 10, "cli-restart"); + + try { + + String mainCliName = "cli-main"; + + IgniteEx cli = startGrid(mainCliName); + + // Warm up. + runQuery(cli, ITERS, replicated); + + int baseReservations = reservationCount(grid()); + + // Run multiple queries on unstable topology. + runQuery(cli, ITERS * 10, replicated); + + int curReservations = reservationCount(grid()); + + assertTrue("Reservations leaks: [base=" + baseReservations + ", cur=" + curReservations + ']', + curReservations < baseReservations * 2); + + log.info("Reservations OK: [base=" + baseReservations + ", cur=" + curReservations + ']'); + } + finally { + end.set(true); + } + + fut.get(); + } + + /** + * @param ign Ignite. + * @param iters Run query 'iters' times + * @param repl Run on replicated or partitioned cache. + */ + private void runQuery(IgniteEx ign, int iters, boolean repl) { + for (int i = 0; i < iters; ++i) + sql(ign, repl ? REPL_SCHEMA : PART_SCHEMA,"SELECT * FROM test").getAll(); + } + + /** + * @param ign Ignite instance. + * @param sql SQL query. + * @param args Query parameters. + * @return Results cursor. + */ + private FieldsQueryCursor<List<?>> sql(IgniteEx ign, String schema, String sql, Object... args) { + return ign.context().query().querySqlFields(new SqlFieldsQuery(sql) + .setSchema(schema) + .setArgs(args), false); + } + + /** + * @param ign Ignite instance. + * @return Count of reservations. + */ + private static int reservationCount(IgniteEx ign) { + IgniteH2Indexing idx = (IgniteH2Indexing)ign.context().query().getIndexing(); + + Map reservations = GridTestUtils.getFieldValue(idx.partitionReservationManager(), "reservations"); + + return reservations.size(); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java index a3223df..00a7dea 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; +import org.apache.ignite.internal.processors.query.MemLeakOnSqlWithClientReconnectTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -63,7 +64,8 @@ import org.junit.runners.Suite; CacheContinuousWithTransformerRandomOperationsTest.class, CacheContinuousQueryRandomOperationsTest.class, StaticCacheDdlTest.class, - StaticCacheDdlKeepStaticConfigurationTest.class + StaticCacheDdlKeepStaticConfigurationTest.class, + MemLeakOnSqlWithClientReconnectTest.class, }) public class IgniteCacheQuerySelfTestSuite6 { }
