This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-2.18 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit fba9935f5855537850f9280d71577cd18f7f580a Author: Maksim Timonin <[email protected]> AuthorDate: Fri Feb 13 15:52:51 2026 +0300 IGNITE-27696 Fix IndexQuery can query backup partition if setPartition is used (#12679) (cherry picked from commit 4e2e691fa9076e630eba928202d59a18b96bb5fd) --- .../processors/cache/query/CacheQuery.java | 23 ++++++++++------------ .../query/GridCacheDistributedQueryFuture.java | 17 +++++++++++----- .../cache/query/GridCacheQueryManager.java | 6 +++--- .../cache/query/IndexQueryPartitionTest.java | 5 +++-- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 6f394c2efce..9c6d6782b1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -62,7 +62,6 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -840,9 +839,12 @@ public class CacheQuery<T> { } /** + * Collects query data nodes matching specified {@code prj} and {@code part}. + * * @param cctx Cache context. * @param prj Projection (optional). - * @return Collection of data nodes in provided projection (if any). + * @param part Partition (optional). + * @return Collection of data nodes matching specified {@code prj} and {@code part}. * @throws IgniteCheckedException If partition number is invalid. */ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @@ -856,19 +858,14 @@ public class CacheQuery<T> { if (prj == null && part == null) return affNodes; - if (part != null && part >= cctx.affinity().partitions()) - throw new IgniteCheckedException("Invalid partition number: " + part); + if (part != null) { + if (part >= cctx.affinity().partitions()) + throw new IgniteCheckedException("Invalid partition number: " + part); - final Set<ClusterNode> owners = - part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer)); + affNodes = cctx.topology().nodes(part, topVer); + } - return F.view(affNodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return cctx.discovery().cacheAffinityNode(n, cctx.name()) && - (prj == null || prj.node(n.id()) != null) && - (part == null || owners.contains(n)); - } - }); + return prj == null ? affNodes : F.view(affNodes, n -> prj.node(n.id()) != null); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index 2b12a3b784c..515cc7323b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; @@ -38,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream; import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer; import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer; import org.apache.ignite.internal.util.lang.GridPlainCallable; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX; @@ -92,7 +94,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries(); if (qry.query().partition() != null) - nodes = Collections.singletonList(node(nodes)); + nodes = Collections.singletonList(cctx.isReplicated() ? localOrRemoteNode(nodes) : F.first(nodes)); streams = new ConcurrentHashMap<>(nodes.size()); @@ -118,18 +120,23 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu } /** - * @return Nodes for query execution. + * @return A local node if available, otherwise a random node from the given collection. */ - private ClusterNode node(Collection<ClusterNode> nodes) { + private ClusterNode localOrRemoteNode(Collection<ClusterNode> nodes) { + int remoteNodeIdx = ThreadLocalRandom.current().nextInt(nodes.size()); + ClusterNode rmtNode = null; for (ClusterNode node : nodes) { if (node.isLocal()) return node; - rmtNode = node; + if (remoteNodeIdx-- == 0) + rmtNode = node; } + assert rmtNode != null; + return rmtNode; } @@ -282,7 +289,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields()); if (nodeId.equals(cctx.localNodeId())) { - // Process cancel query directly (without sending) for local node, + // Process cancel query directly (without sending) for local node. cctx.closures().callLocalSafe(new GridPlainCallable<Object>() { @Override public Object call() { qryMgr.processQueryRequest(cctx.localNodeId(), req); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 8fe2c925bb1..2db3ed5263e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1121,11 +1121,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte int[] parts = null; if (part != null) { - final GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(part); + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - if (locPart == null || locPart.state() != OWNING) { + if (cctx.isPartitioned() && !cctx.affinity().primaryByPartition(cctx.localNode(), part, topVer)) { throw new CacheInvalidStateException("Failed to execute index query because required partition " + - "has not been found on local node [cacheName=" + cctx.name() + ", part=" + part + "]"); + "is not primary on local node [cacheName=" + cctx.name() + ", part=" + part + ", topVer=" + topVer + ']'); } parts = new int[] {part}; diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java index 73ea7f30532..8fe205ef7d9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java @@ -78,6 +78,7 @@ public class IndexQueryPartitionTest extends GridCommonAbstractTest { .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) .setCacheMode(cacheMode) .setIndexedTypes(Integer.class, Person.class) + .setBackups(1) .setAffinity(new RendezvousAffinityFunction().setPartitions(100)); cfg.setCacheConfiguration(ccfg); @@ -136,7 +137,7 @@ public class IndexQueryPartitionTest extends GridCommonAbstractTest { } } - assertEquals(sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size()); + assertEquals("part=" + part, sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size()); } } @@ -181,7 +182,7 @@ public class IndexQueryPartitionTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(null, () -> grid().cache("CACHE").query(qry).getAll(), client ? IgniteException.class : CacheInvalidStateException.class, client ? "Failed to execute local index query on a client node." : - "Failed to execute index query because required partition has not been found on local node"); + "Failed to execute index query because required partition is not primary on local node"); } else assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());
