Repository: ignite Updated Branches: refs/heads/ignite-2.5 d3140981c -> 0061e7c79
IGNITE-7628 SqlQuery hangs indefinitely with additional not registered in baseline node. Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0061e7c7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0061e7c7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0061e7c7 Branch: refs/heads/ignite-2.5 Commit: 0061e7c79f5c7137f3be0a5100b50189c21aa6cd Parents: d314098 Author: EdShangGG <[email protected]> Authored: Sat Apr 28 19:27:27 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Sat Apr 28 19:29:59 2018 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 52 ++++-- .../near/IgniteSqlQueryWithBaselineTest.java | 184 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite2.java | 2 + 3 files changed, 221 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0061e7c7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 261e73d..15badf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -367,7 +367,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { */ public void addCacheGroup(CacheGroupDescriptor grpDesc, IgnitePredicate<ClusterNode> filter, CacheMode cacheMode) { CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(), - new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode)); + new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode, grpDesc.persistenceEnabled())); assert old == null : old; } @@ -2387,12 +2387,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; - Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); - Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); - Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance()); - - fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); - BaselineTopology blt = state.baselineTopology(); if (blt != null) { @@ -2435,6 +2429,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { baselineNodes = null; } + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance()); + + fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches, + nodeIdToConsIdx == null ? null : nodeIdToConsIdx.keySet()); + return new DiscoCache( topVer, state, @@ -3127,23 +3128,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Cache mode. */ private final CacheMode cacheMode; + /** Persistent cache group or not. */ + private final boolean persistentCacheGrp; + /** * @param name Name. * @param cacheFilter Node filter. * @param cacheMode Cache mode. + * @param persistentCacheGrp Persistence is configured for cache or not. */ CacheGroupAffinity( - String name, - IgnitePredicate<ClusterNode> cacheFilter, - CacheMode cacheMode) { + String name, + IgnitePredicate<ClusterNode> cacheFilter, + CacheMode cacheMode, + boolean persistentCacheGrp) { this.name = name; this.cacheFilter = cacheFilter; this.cacheMode = cacheMode; + this.persistentCacheGrp = persistentCacheGrp; } /** {@inheritDoc} */ @Override public String toString() { - return "CacheGroupAffinity [name=" + name + ']'; + return S.toString(CacheGroupAffinity.class, this); } } @@ -3268,14 +3275,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Fills affinity node caches. - * * @param allNodes All nodes. * @param allCacheNodes All cache nodes. * @param cacheGrpAffNodes Cache group aff nodes. * @param rmtNodesWithCaches Rmt nodes with caches. - */ - private void fillAffinityNodeCaches(List<ClusterNode> allNodes, Map<Integer, List<ClusterNode>> allCacheNodes, - Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Set<ClusterNode> rmtNodesWithCaches) { + * @param bltNodes Baseline node ids. + */ + private void fillAffinityNodeCaches( + List<ClusterNode> allNodes, + Map<Integer, List<ClusterNode>> allCacheNodes, + Map<Integer, List<ClusterNode>> cacheGrpAffNodes, + Set<ClusterNode> rmtNodesWithCaches, + Set<UUID> bltNodes + ) { for (ClusterNode node : allNodes) { assert node.order() != 0 : "Invalid node order [locNode=" + localNode() + ", node=" + node + ']'; assert !node.isDaemon(); @@ -3285,6 +3297,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Integer grpId = e.getKey(); if (CU.affinityNode(node, grpAff.cacheFilter)) { + if (grpAff.persistentCacheGrp && bltNodes != null && !bltNodes.contains(node.id())) // Filter out. + continue; + List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId); if (nodes == null) @@ -3324,7 +3339,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance()); - fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); + Map<UUID, Short> nodeIdToConsIdx = discoCache.nodeIdToConsIdx; + + fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches, + nodeIdToConsIdx == null ? null : nodeIdToConsIdx.keySet()); return new DiscoCache( topVer, @@ -3340,7 +3358,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { cacheGrpAffNodes, discoCache.nodeMap, discoCache.alives, - discoCache.nodeIdToConsIdx, + nodeIdToConsIdx, discoCache.consIdxToNodeId, discoCache.minimumNodeVersion(), discoCache.minimumServerNodeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0061e7c7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteSqlQueryWithBaselineTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteSqlQueryWithBaselineTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteSqlQueryWithBaselineTest.java new file mode 100644 index 0000000..203a319 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteSqlQueryWithBaselineTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.near; + + +import java.io.Serializable; +import java.util.Collection; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import javax.cache.Cache; + +/** + * + */ +public class IgniteSqlQueryWithBaselineTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(200 * 1024 * 1024) + .setPersistenceEnabled(true) + ) + ); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + public static class C1 implements Serializable { + /** */ + private static final long serialVersionUID = 1L; + + /** */ + public C1(int id) { + this.id = id; + } + + /** */ + @QuerySqlField(index = true) + protected Integer id; + } + + /** */ + public static class C2 implements Serializable { + /** */ + private static final long serialVersionUID = 1L; + + /** */ + C2(int id) { + this.id = id; + } + + /** */ + @QuerySqlField(index = true) + protected Integer id; + } + + /** + * @throws Exception If failed. + */ + public void testQueryWithNodeNotInBLT() throws Exception { + startGrids(2); + + grid(0).cluster().active(true); + + startGrid(2); //Start extra node. + + doQuery(); + } + + /** + * @throws Exception If failed. + */ + public void testQueryWithoutBLTNode() throws Exception { + startGrids(2); + + grid(0).cluster().active(true); + + startGrid(2); //Start extra node. + stopGrid(1); + + doQuery(); + } + + /** + * @throws Exception If failed. + */ + public void testQueryFromNotBLTNode() throws Exception { + startGrid(1); + + grid(1).cluster().active(true); + + startGrid(0); //Start extra node. + + doQuery(); + } + + /** + * + */ + private void doQuery() { + CacheConfiguration<Integer, C1> c1Conf = new CacheConfiguration<>("C1"); + c1Conf.setIndexedTypes(Integer.class, C1.class).setBackups(2); + + CacheConfiguration<Integer, C2> c2Conf = new CacheConfiguration<>("C2"); + c2Conf.setIndexedTypes(Integer.class, C2.class).setBackups(2); + + final IgniteCache<Integer, C1> cache = grid(0).getOrCreateCache(c1Conf); + + final IgniteCache<Integer, C2> cache1 = grid(0).getOrCreateCache(c2Conf); + + for (int i = 0; i < 100; i++) { + cache.put(i, new C1(i)); + + cache1.put(i, new C2(i)); + } + + String sql = "SELECT C1.*" + + " from C1 inner join \"C2\".C2 as D on C1.id = D.id" + + " order by C1.id asc"; + + SqlQuery<Integer, C1> qry = new SqlQuery<>(C1.class, sql); + + qry.setDistributedJoins(true); + + log.info("before query run..."); + + Collection<Cache.Entry<Integer, C1>> res = cache.query(qry).getAll(); + + log.info("result size: " + res.size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0061e7c7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 11d98e2..5b888ce 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartTxSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteSqlQueryWithBaselineTest; import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicPartitionedSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentTransactionalPartitionedSelfTest; @@ -87,6 +88,7 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeFailTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); + suite.addTestSuite(IgniteSqlQueryWithBaselineTest.class); suite.addTestSuite(IgniteChangingBaselineCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteStableBaselineCacheQueryNodeRestartsSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
