This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4e2e691fa90 IGNITE-27696 Fix IndexQuery can query backup partition if
setPartition is used (#12679)
4e2e691fa90 is described below
commit 4e2e691fa9076e630eba928202d59a18b96bb5fd
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Feb 13 15:52:51 2026 +0300
IGNITE-27696 Fix IndexQuery can query backup partition if setPartition is
used (#12679)
---
.../processors/cache/query/CacheQuery.java | 23 ++++++++++------------
.../query/GridCacheDistributedQueryFuture.java | 17 +++++++++++-----
.../cache/query/GridCacheQueryManager.java | 6 +++---
.../cache/query/IndexQueryPartitionTest.java | 5 +++--
4 files changed, 28 insertions(+), 23 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 6f394c2efce..9c6d6782b1c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -62,7 +62,6 @@ import
org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -840,9 +839,12 @@ public class CacheQuery<T> {
}
/**
+ * Collects query data nodes matching specified {@code prj} and {@code
part}.
+ *
* @param cctx Cache context.
* @param prj Projection (optional).
- * @return Collection of data nodes in provided projection (if any).
+ * @param part Partition (optional).
+ * @return Collection of data nodes matching specified {@code prj} and
{@code part}.
* @throws IgniteCheckedException If partition number is invalid.
*/
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?>
cctx,
@@ -856,19 +858,14 @@ public class CacheQuery<T> {
if (prj == null && part == null)
return affNodes;
- if (part != null && part >= cctx.affinity().partitions())
- throw new IgniteCheckedException("Invalid partition number: " +
part);
+ if (part != null) {
+ if (part >= cctx.affinity().partitions())
+ throw new IgniteCheckedException("Invalid partition number: "
+ part);
- final Set<ClusterNode> owners =
- part == null ? Collections.<ClusterNode>emptySet() : new
HashSet<>(cctx.topology().owners(part, topVer));
+ affNodes = cctx.topology().nodes(part, topVer);
+ }
- return F.view(affNodes, new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode n) {
- return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
- (prj == null || prj.node(n.id()) != null) &&
- (part == null || owners.contains(n));
- }
- });
+ return prj == null ? affNodes : F.view(affNodes, n -> prj.node(n.id())
!= null);
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 9ed56f805c5..58a58914824 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +39,7 @@ import
org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
import
org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
import
org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
@@ -92,7 +94,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends
GridCacheQueryFutu
qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();
if (qry.query().partition() != null)
- nodes = Collections.singletonList(node(nodes));
+ nodes = Collections.singletonList(cctx.isReplicated() ?
localOrRemoteNode(nodes) : F.first(nodes));
streams = new ConcurrentHashMap<>(nodes.size());
@@ -118,18 +120,23 @@ public class GridCacheDistributedQueryFuture<K, V, R>
extends GridCacheQueryFutu
}
/**
- * @return Nodes for query execution.
+ * @return A local node if available, otherwise a random node from the
given collection.
*/
- private ClusterNode node(Collection<ClusterNode> nodes) {
+ private ClusterNode localOrRemoteNode(Collection<ClusterNode> nodes) {
+ int remoteNodeIdx = ThreadLocalRandom.current().nextInt(nodes.size());
+
ClusterNode rmtNode = null;
for (ClusterNode node : nodes) {
if (node.isLocal())
return node;
- rmtNode = node;
+ if (remoteNodeIdx-- == 0)
+ rmtNode = node;
}
+ assert rmtNode != null;
+
return rmtNode;
}
@@ -282,7 +289,7 @@ public class GridCacheDistributedQueryFuture<K, V, R>
extends GridCacheQueryFutu
GridCacheQueryRequest req =
GridCacheQueryRequest.cancelRequest(cctx, reqId);
if (nodeId.equals(cctx.localNodeId())) {
- // Process cancel query directly (without sending) for local
node,
+ // Process cancel query directly (without sending) for local
node.
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() {
qryMgr.processQueryRequest(cctx.localNodeId(), req);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8fe2c925bb1..2db3ed5263e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1121,11 +1121,11 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
int[] parts = null;
if (part != null) {
- final GridDhtLocalPartition locPart =
cctx.dht().topology().localPartition(part);
+ AffinityTopologyVersion topVer =
cctx.affinity().affinityTopologyVersion();
- if (locPart == null || locPart.state() != OWNING) {
+ if (cctx.isPartitioned() &&
!cctx.affinity().primaryByPartition(cctx.localNode(), part, topVer)) {
throw new CacheInvalidStateException("Failed to execute
index query because required partition " +
- "has not been found on local node [cacheName=" +
cctx.name() + ", part=" + part + "]");
+ "is not primary on local node [cacheName=" +
cctx.name() + ", part=" + part + ", topVer=" + topVer + ']');
}
parts = new int[] {part};
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
index 73ea7f30532..8fe205ef7d9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
@@ -78,6 +78,7 @@ public class IndexQueryPartitionTest extends
GridCommonAbstractTest {
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(cacheMode)
.setIndexedTypes(Integer.class, Person.class)
+ .setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(100));
cfg.setCacheConfiguration(ccfg);
@@ -136,7 +137,7 @@ public class IndexQueryPartitionTest extends
GridCommonAbstractTest {
}
}
- assertEquals(sendReq,
TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
+ assertEquals("part=" + part, sendReq,
TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
}
}
@@ -181,7 +182,7 @@ public class IndexQueryPartitionTest extends
GridCommonAbstractTest {
GridTestUtils.assertThrows(null, () ->
grid().cache("CACHE").query(qry).getAll(),
client ? IgniteException.class :
CacheInvalidStateException.class,
client ? "Failed to execute local index query on a client
node." :
- "Failed to execute index query because required
partition has not been found on local node");
+ "Failed to execute index query because required
partition is not primary on local node");
}
else
assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());