This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new af5d1ddbee2 IGNITE-23975 SQL Calcite: Add group partitions reservation 
- Fixes #11758.
af5d1ddbee2 is described below

commit af5d1ddbee2a6216e9f5a6b17b499df22b1e9aa0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Dec 27 21:22:05 2024 +0300

    IGNITE-23975 SQL Calcite: Add group partitions reservation - Fixes #11758.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/AbstractCacheScan.java      |  94 ++---
 .../processors/query/calcite/exec/IndexScan.java   |   2 +-
 .../processors/query/calcite/exec/TableScan.java   |   4 +-
 .../query/calcite/metadata/ColocationGroup.java    |  54 ++-
 .../calcite/metadata/FragmentDescription.java      |   5 +-
 .../calcite/schema/CacheTableDescriptorImpl.java   |  23 +-
 .../dht/topology}/PartitionReservation.java        |   2 +-
 .../dht/topology}/PartitionReservationKey.java     |   2 +-
 .../dht/topology/PartitionReservationManager.java  | 443 +++++++++++++++++++++
 .../processors/query/GridQueryProcessor.java       |  15 +
 .../processors/query/h2/IgniteH2Indexing.java      |   8 +-
 .../processors/query/h2/opt/QueryContext.java      |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java     |   1 +
 .../h2/twostep/PartitionReservationManager.java    | 375 -----------------
 .../internal/processors/query/KillQueryTest.java   |   4 +-
 .../h2/twostep/RetryCauseMessageSelfTest.java      |   2 +
 16 files changed, 582 insertions(+), 454 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index a8126fbce4d..1f46ed0d2da 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -18,17 +18,18 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.typedef.F;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
 
 /** */
 public abstract class AbstractCacheScan<Row> implements Iterable<Row>, 
AutoCloseable {
@@ -45,15 +46,35 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
     protected final int[] parts;
 
     /** */
-    protected volatile List<GridDhtLocalPartition> reserved;
+    protected final boolean explicitParts;
+
+    /** */
+    private PartitionReservation reservation;
+
+    /** */
+    protected volatile List<GridDhtLocalPartition> reservedParts;
 
     /** */
     AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx, 
int[] parts) {
         this.ectx = ectx;
         this.cctx = cctx;
-        this.parts = parts;
 
         topVer = ectx.topologyVersion();
+
+        explicitParts = parts != null;
+
+        if (cctx.isReplicated())
+            this.parts = IntStream.range(0, 
cctx.affinity().partitions()).toArray();
+        else {
+            if (parts != null)
+                this.parts = parts;
+            else {
+                Collection<Integer> primaryParts = 
cctx.affinity().primaryPartitions(
+                    cctx.kernalContext().localNodeId(), topVer);
+
+                this.parts = 
primaryParts.stream().mapToInt(Integer::intValue).toArray();
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -80,7 +101,7 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
 
     /** */
     private synchronized void reserve() {
-        if (reserved != null)
+        if (reservation != null)
             return;
 
         GridDhtPartitionTopology top = cctx.topology();
@@ -98,61 +119,42 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
             throw new ClusterTopologyException("Topology was changed. Please 
retry on stable topology.");
         }
 
-        List<GridDhtLocalPartition> toReserve;
-
-        if (cctx.isReplicated()) {
-            int partsCnt = cctx.affinity().partitions();
-
-            toReserve = new ArrayList<>(partsCnt);
-
-            for (int i = 0; i < partsCnt; i++)
-                toReserve.add(top.localPartition(i));
-        }
-        else if (cctx.isPartitioned()) {
-            assert parts != null;
+        try {
+            PartitionReservation reservation;
 
-            toReserve = new ArrayList<>(parts.length);
+            try {
+                reservation = 
cctx.kernalContext().query().partitionReservationManager().reservePartitions(
+                    cctx, topVer, explicitParts ? parts : null, 
ectx.originatingNodeId(), "qryId=" + ectx.queryId());
+            }
+            catch (IgniteCheckedException e) {
+                throw new ClusterTopologyException("Failed to reserve 
partition for query execution", e);
+            }
 
-            for (int i = 0; i < parts.length; i++)
-                toReserve.add(top.localPartition(parts[i]));
-        }
-        else
-            toReserve = Collections.emptyList();
+            if (reservation.failed()) {
+                reservation.release();
 
-        List<GridDhtLocalPartition> reserved = new 
ArrayList<>(toReserve.size());
+                throw new ClusterTopologyException(reservation.error());
+            }
 
-        try {
-            for (GridDhtLocalPartition part : toReserve) {
-                if (part == null || !part.reserve())
-                    throw new ClusterTopologyException("Failed to reserve 
partition for query execution. Retry on stable topology.");
-                else if (part.state() != GridDhtPartitionState.OWNING) {
-                    part.release();
+            this.reservation = reservation;
 
-                    throw new ClusterTopologyException("Failed to reserve 
partition for query execution. Retry on stable topology.");
-                }
+            List<GridDhtLocalPartition> reservedParts = new 
ArrayList<>(parts.length);
 
-                reserved.add(part);
-            }
-        }
-        catch (Exception e) {
-            release();
+            for (int i = 0; i < parts.length; i++)
+                reservedParts.add(top.localPartition(parts[i]));
 
-            throw e;
+            this.reservedParts = reservedParts;
         }
         finally {
-            this.reserved = reserved;
-
             top.readUnlock();
         }
     }
 
     /** */
     private synchronized void release() {
-        if (F.isEmpty(reserved))
-            return;
-
-        reserved.forEach(GridDhtLocalPartition::release);
+        if (reservation != null)
+            reservation.release();
 
-        reserved = null;
+        reservation = null;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 2077996aa09..d61f302b0ab 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -116,7 +116,7 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
 
             txChanges = ectx.transactionChanges(
                 cctx.cacheId(),
-                parts,
+                cctx.isReplicated() ? null : this.parts,
                 r -> new IndexRowImpl(rowHnd, r),
                 this::compare
             );
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index f008f3f6af3..c16252b902a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -74,9 +74,9 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
 
         /** */
         private IteratorImpl() {
-            assert reserved != null;
+            assert reservedParts != null;
 
-            parts = new ArrayDeque<>(reserved);
+            parts = new ArrayDeque<>(reservedParts);
 
             txChanges = F.isEmpty(ectx.getQryTxEntries())
                 ? TransactionChanges.empty()
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index c5e652a00af..af9fadc5723 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage {
     @GridDirectTransient
     private List<List<UUID>> assignments;
 
+    /**
+     * Flag, indacating that assignment is formed by original cache assignment 
for given topology.
+     * In case of {@code true} value we can skip assignment marshalling and 
calc assignment on remote nodes.
+     */
+    @GridDirectTransient
+    private boolean primaryAssignment;
+
     /** Marshalled assignments. */
     private int[] marshalledAssignments;
 
@@ -68,7 +75,7 @@ public class ColocationGroup implements MarshalableMessage {
 
     /** */
     public static ColocationGroup forAssignments(List<List<UUID>> assignments) 
{
-        return new ColocationGroup(null, null, assignments);
+        return new ColocationGroup(null, null, assignments, true);
     }
 
     /** */
@@ -100,6 +107,13 @@ public class ColocationGroup implements MarshalableMessage 
{
         this.assignments = assignments;
     }
 
+    /** */
+    private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, 
List<List<UUID>> assignments, boolean primaryAssignment) {
+        this(sourceIds, nodeIds, assignments);
+
+        this.primaryAssignment = primaryAssignment;
+    }
+
     /**
      * @return Lists of nodes capable to execute a query fragment for what the 
mapping is calculated.
      */
@@ -143,10 +157,10 @@ public class ColocationGroup implements 
MarshalableMessage {
      */
     public ColocationGroup colocate(ColocationGroup other) throws 
ColocationMappingException {
         long[] srcIds;
-        if (this.sourceIds == null || other.sourceIds == null)
-            srcIds = U.firstNotNull(this.sourceIds, other.sourceIds);
+        if (sourceIds == null || other.sourceIds == null)
+            srcIds = U.firstNotNull(sourceIds, other.sourceIds);
         else
-            srcIds = LongStream.concat(Arrays.stream(this.sourceIds), 
Arrays.stream(other.sourceIds)).distinct().toArray();
+            srcIds = LongStream.concat(Arrays.stream(sourceIds), 
Arrays.stream(other.sourceIds)).distinct().toArray();
 
         List<UUID> nodeIds;
         if (this.nodeIds == null || other.nodeIds == null)
@@ -159,6 +173,8 @@ public class ColocationGroup implements MarshalableMessage {
                 "Replicated query parts are not co-located on all nodes");
         }
 
+        boolean primaryAssignment = this.primaryAssignment || 
other.primaryAssignment;
+
         List<List<UUID>> assignments;
         if (this.assignments == null || other.assignments == null) {
             assignments = U.firstNotNull(this.assignments, other.assignments);
@@ -170,11 +186,14 @@ public class ColocationGroup implements 
MarshalableMessage {
                 for (int i = 0; i < assignments.size(); i++) {
                     List<UUID> assignment = Commons.intersect(filter, 
assignments.get(i));
 
-                    if (assignment.isEmpty()) { // TODO check with partition 
filters
+                    if (assignment.isEmpty()) {
                         throw new ColocationMappingException("Failed to map 
fragment to location. " +
                             "Partition mapping is empty [part=" + i + "]");
                     }
 
+                    if (!assignment.get(0).equals(assignments.get(i).get(0)))
+                        primaryAssignment = false;
+
                     assignments0.add(assignment);
                 }
 
@@ -191,14 +210,20 @@ public class ColocationGroup implements 
MarshalableMessage {
                 if (filter != null)
                     assignment.retainAll(filter);
 
-                if (assignment.isEmpty()) // TODO check with partition filters
-                    throw new ColocationMappingException("Failed to map 
fragment to location. Partition mapping is empty [part=" + i + "]");
+                if (assignment.isEmpty()) {
+                    throw new ColocationMappingException("Failed to map 
fragment to location. " +
+                        "Partition mapping is empty [part=" + i + "]");
+                }
+
+                if (!assignment.get(0).equals(this.assignments.get(i).get(0))
+                    || 
!assignment.get(0).equals(other.assignments.get(i).get(0)))
+                    primaryAssignment = false;
 
                 assignments.add(assignment);
             }
         }
 
-        return new ColocationGroup(srcIds, nodeIds, assignments);
+        return new ColocationGroup(srcIds, nodeIds, assignments, 
primaryAssignment);
     }
 
     /** */
@@ -216,7 +241,16 @@ public class ColocationGroup implements MarshalableMessage 
{
             assignments.add(first != null ? Collections.singletonList(first) : 
Collections.emptyList());
         }
 
-        return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments);
+        return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments, primaryAssignment);
+    }
+
+    /** */
+    public ColocationGroup explicitMapping() {
+        if (assignments == null || !primaryAssignment)
+            return this;
+
+        // Make a shallow copy without cacheAssignment flag.
+        return new ColocationGroup(sourceIds, nodeIds, assignments, false);
     }
 
     /** */
@@ -359,7 +393,7 @@ public class ColocationGroup implements MarshalableMessage {
 
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
-        if (assignments != null && marshalledAssignments == null) {
+        if (assignments != null && marshalledAssignments == null && 
!primaryAssignment) {
             Map<UUID, Integer> nodeIdxs = new HashMap<>();
 
             for (int i = 0; i < nodeIds.size(); i++)
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index dfef48e3346..f1dc050aa82 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -192,8 +192,11 @@ public class FragmentDescription implements 
MarshalableMessage {
         if (mapping != null)
             mapping.prepareMarshal(ctx);
 
-        if (target != null)
+        if (target != null) {
+            target = target.explicitMapping();
+
             target.prepareMarshal(ctx);
+        }
 
         if (remoteSources0 == null && remoteSources != null) {
             remoteSources0 = U.newHashMap(remoteSources.size());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index ab704a6850d..11dca7c96ff 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -599,18 +599,25 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
         List<ClusterNode> nodes = 
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
         List<UUID> nodes0;
 
-        if (!top.rebalanceFinished(topVer)) {
-            nodes0 = new ArrayList<>(nodes.size());
+        top.readLock();
 
-            int parts = top.partitions();
+        try {
+            if (!top.rebalanceFinished(topVer)) {
+                nodes0 = new ArrayList<>(nodes.size());
+
+                int parts = top.partitions();
 
-            for (ClusterNode node : nodes) {
-                if (isOwner(node.id(), top, parts))
-                    nodes0.add(node.id());
+                for (ClusterNode node : nodes) {
+                    if (isOwner(node.id(), top, parts))
+                        nodes0.add(node.id());
+                }
             }
+            else
+                nodes0 = Commons.transform(nodes, ClusterNode::id);
+        }
+        finally {
+            top.readUnlock();
         }
-        else
-            nodes0 = Commons.transform(nodes, ClusterNode::id);
 
         return ColocationGroup.forNodes(nodes0);
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
similarity index 96%
rename from 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
index b5593e58d8e..ae21192819d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
similarity index 96%
rename from 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
index 0fad2c4dd09..60911e7ac84 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
 
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
new file mode 100644
index 00000000000..fa44fc6a10a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.tracing.MTC;
+import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE;
+
+/**
+ * Class responsible for partition reservation for queries executed on local 
node. Prevents partitions from being
+ * evicted from node during query execution.
+ */
+public class PartitionReservationManager implements PartitionsExchangeAware {
+    /** Special instance of reservable object for REPLICATED caches. */
+    private static final ReplicatedReservable REPLICATED_RESERVABLE = new 
ReplicatedReservable();
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /**
+     * Group reservations cache. When affinity version is not changed and all 
primary partitions must be reserved we get
+     * group reservation from this map instead of create new reservation group.
+     */
+    private final ConcurrentMap<PartitionReservationKey, GridReservable> 
reservations = new ConcurrentHashMap<>();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    public PartitionReservationManager(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(PartitionReservationManager.class);
+
+        ctx.cache().context().exchange().registerExchangeAwareComponent(this);
+    }
+
+    /**
+     * @param top Partition topology.
+     * @param partId Partition ID.
+     * @return Partition.
+     */
+    private static GridDhtLocalPartition partition(GridDhtPartitionTopology 
top, int partId) {
+        return top.localPartition(partId, NONE, false);
+    }
+
+    /**
+     * @param cacheIds Cache IDs.
+     * @param reqTopVer Topology version from request.
+     * @param explicitParts Explicit partitions list.
+     * @param nodeId Node ID.
+     * @param reqId Request ID.
+     * @return PartitionReservation instance with reservation result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public PartitionReservation reservePartitions(
+        @Nullable List<Integer> cacheIds,
+        AffinityTopologyVersion reqTopVer,
+        int[] explicitParts,
+        UUID nodeId,
+        long reqId
+    ) throws IgniteCheckedException {
+        try (TraceSurroundings ignored = 
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
+            assert reqTopVer != null;
+
+            AffinityTopologyVersion topVer = 
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
+
+            if (F.isEmpty(cacheIds))
+                return new PartitionReservation(Collections.emptyList());
+
+            Collection<Integer> partIds = partsToCollection(explicitParts);
+
+            List<GridReservable> reserved = new ArrayList<>();
+
+            for (int i = 0; i < cacheIds.size(); i++) {
+                GridCacheContext<?, ?> cctx = 
ctx.cache().context().cacheContext(cacheIds.get(i));
+
+                // Cache was not found, probably was not deployed yet.
+                if (cctx == null) {
+                    return new PartitionReservation(reserved,
+                        String.format("Failed to reserve partitions for query 
(cache is not " +
+                                "found on local node) [localNodeId=%s, 
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
+                            ctx.localNodeId(), nodeId, reqId, topVer, 
cacheIds.get(i)));
+                }
+
+                if (!cctx.rebalanceEnabled())
+                    continue;
+
+                String err = reservePartitions(reserved, cctx, partIds, 
topVer, nodeId, "reqId=" + reqId);
+
+                if (err != null)
+                    return new PartitionReservation(reserved, err);
+            }
+
+            return new PartitionReservation(reserved);
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param reqTopVer Topology version from request.
+     * @param explicitParts Explicit partitions list.
+     * @param nodeId Node ID.
+     * @param qryInfo Query info.
+     * @return PartitionReservation instance with reservation result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public PartitionReservation reservePartitions(
+        GridCacheContext<?, ?> cctx,
+        AffinityTopologyVersion reqTopVer,
+        int[] explicitParts,
+        UUID nodeId,
+        String qryInfo
+    ) throws IgniteCheckedException {
+        try (TraceSurroundings ignored = 
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
+            assert reqTopVer != null;
+
+            AffinityTopologyVersion topVer = 
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
+
+            Collection<Integer> partIds = partsToCollection(explicitParts);
+
+            List<GridReservable> reserved = new ArrayList<>();
+
+            String err = reservePartitions(reserved, cctx, partIds, topVer, 
nodeId, qryInfo);
+
+            return new PartitionReservation(reserved, err);
+        }
+    }
+
+    /**
+     * @return Error message or {@code null}.
+     */
+    private @Nullable String reservePartitions(
+        List<GridReservable> reserved,
+        GridCacheContext<?, ?> cctx,
+        @Nullable Collection<Integer> explicitParts,
+        AffinityTopologyVersion topVer,
+        UUID nodeId,
+        String qryInfo
+    ) throws IgniteCheckedException {
+        // For replicated cache topology version does not make sense.
+        PartitionReservationKey grpKey = new 
PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
+
+        GridReservable r = reservations.get(grpKey);
+
+        if (explicitParts == null && r != null)  // Try to reserve group 
partition if any and no explicits.
+            return groupPartitionReservation(reserved, r, cctx, topVer, 
nodeId, qryInfo);
+        else { // Try to reserve partitions one by one.
+            int partsCnt = cctx.affinity().partitions();
+
+            if (cctx.isReplicated()) { // Check all the partitions are in 
owning state for replicated cache.
+                if (r == null) { // Check only once.
+                    GridDhtPartitionTopology top = cctx.topology();
+
+                    top.readLock();
+
+                    try {
+                        for (int p = 0; p < partsCnt; p++) {
+                            GridDhtLocalPartition part = partition(top, p);
+
+                            // We don't need to reserve partitions because 
they will not be evicted in replicated caches.
+                            GridDhtPartitionState partState = part != null ? 
part.state() : null;
+
+                            if (partState != OWNING) {
+                                return String.format("Failed to reserve 
partitions for " +
+                                    "query (partition of REPLICATED cache is 
not in OWNING state) [" +
+                                    "localNodeId=%s, rmtNodeId=%s, %s, 
affTopVer=%s, cacheId=%s, " +
+                                    "cacheName=%s, part=%s, partFound=%s, 
partState=%s]",
+                                    ctx.localNodeId(),
+                                    nodeId,
+                                    qryInfo,
+                                    topVer,
+                                    cctx.cacheId(),
+                                    cctx.name(),
+                                    p,
+                                    (part != null),
+                                    partState
+                                );
+                            }
+                        }
+                    }
+                    finally {
+                        top.readUnlock();
+                    }
+
+                    // Mark that we checked this replicated cache.
+                    reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE);
+
+                    MTC.span().addLog(() -> "Cache partitions were reserved 
[cache=" + cctx.name() +
+                        ", partitions=[0.." + partsCnt + ']');
+                }
+            }
+            else { // Reserve primary partitions for partitioned cache (if no 
explicit given).
+                Collection<Integer> partIds = explicitParts != null ? 
explicitParts
+                    : cctx.affinity().primaryPartitions(ctx.localNodeId(), 
topVer);
+
+                int reservedCnt = 0;
+
+                GridDhtPartitionTopology top = cctx.topology();
+
+                top.readLock();
+
+                try {
+                    for (int partId : partIds) {
+                        GridDhtLocalPartition part = partition(top, partId);
+
+                        GridDhtPartitionState partState = part != null ? 
part.state() : null;
+
+                        if (partState != OWNING) {
+                            if (partState == LOST) {
+                                reserved.forEach(GridReservable::release);
+
+                                failQueryOnLostData(cctx, part);
+                            }
+                            else {
+                                return String.format("Failed to reserve 
partitions " +
+                                    "for query (partition of PARTITIONED cache 
is not found or not in OWNING " +
+                                    "state) [localNodeId=%s, rmtNodeId=%s, %s, 
affTopVer=%s, cacheId=%s, " +
+                                    "cacheName=%s, part=%s, partFound=%s, 
partState=%s]",
+                                    ctx.localNodeId(),
+                                    nodeId,
+                                    qryInfo,
+                                    topVer,
+                                    cctx.cacheId(),
+                                    cctx.name(),
+                                    partId,
+                                    (part != null),
+                                    partState
+                                );
+                            }
+                        }
+
+                        if (!part.reserve()) {
+                            return String.format("Failed to reserve partitions 
for query " +
+                                "(partition of PARTITIONED cache cannot be 
reserved) [" +
+                                "localNodeId=%s, rmtNodeId=%s, %s, 
affTopVer=%s, cacheId=%s, " +
+                                "cacheName=%s, part=%s, partFound=%s, 
partState=%s]",
+                                ctx.localNodeId(),
+                                nodeId,
+                                qryInfo,
+                                topVer,
+                                cctx.cacheId(),
+                                cctx.name(),
+                                partId,
+                                true,
+                                partState
+                            );
+                        }
+
+                        reserved.add(part);
+
+                        reservedCnt++;
+                    }
+                }
+                finally {
+                    top.readUnlock();
+                }
+
+                MTC.span().addLog(() -> "Cache partitions were reserved 
[cache=" + cctx.name() +
+                    ", partitions=" + partIds + ", topology=" + topVer + ']');
+
+                if (explicitParts == null && reservedCnt > 0) {
+                    // We reserved all the primary partitions for cache, 
attempt to add group reservation.
+                    GridDhtPartitionsReservation grp = new 
GridDhtPartitionsReservation(topVer, cctx, "SQL");
+
+                    synchronized (this) {
+                        // Double check under lock.
+                        GridReservable grpReservation = 
reservations.get(grpKey);
+
+                        if (grpReservation != null)
+                            return groupPartitionReservation(reserved, 
grpReservation, cctx, topVer, nodeId, qryInfo);
+                        else {
+                            if (grp.register(reserved.subList(reserved.size() 
- reservedCnt, reserved.size()))) {
+                                reservations.put(grpKey, grp);
+
+                                grp.onPublish(new CI1<>() {
+                                    @Override public void 
apply(GridDhtPartitionsReservation r) {
+                                        reservations.remove(grpKey, r);
+                                    }
+                                });
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    public void onCacheStop(String cacheName) {
+        // Drop group reservations.
+        for (PartitionReservationKey grpKey : reservations.keySet()) {
+            if (F.eq(grpKey.cacheName(), cacheName))
+                reservations.remove(grpKey);
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param part Partition.
+     */
+    private static void failQueryOnLostData(
+        GridCacheContext<?, ?> cctx,
+        GridDhtLocalPartition part
+    ) throws IgniteCheckedException {
+        throw new CacheInvalidStateException("Failed to execute query because 
cache partition has been " +
+            "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
+    }
+
+    /**
+     * Cleanup group reservations cache on change affinity version.
+     */
+    @Override public void 
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        try {
+            // Must not do anything at the exchange thread. Dispatch to the 
management thread pool.
+            ctx.closure().runLocal(
+                new GridPlainRunnable() {
+                    @Override public void run() {
+                        AffinityTopologyVersion topVer = 
ctx.cache().context().exchange()
+                            
.lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+                        reservations.forEach((key, r) -> {
+                            if (r != REPLICATED_RESERVABLE && 
!F.eq(key.topologyVersion(), topVer)) {
+                                assert r instanceof 
GridDhtPartitionsReservation;
+
+                                ((GridDhtPartitionsReservation)r).invalidate();
+                            }
+                        });
+                    }
+                },
+                GridIoPolicy.MANAGEMENT_POOL);
+        }
+        catch (Throwable e) {
+            log.error("Unexpected exception on start reservations cleanup.");
+            ctx.failure().process(new FailureContext(CRITICAL_ERROR, e));
+        }
+    }
+
+    /** */
+    private static Collection<Integer> partsToCollection(int[] explicitParts) {
+        if (explicitParts == null)
+            return null;
+        else if (explicitParts.length == 0)
+            return Collections.emptyList();
+        else {
+            List<Integer> partIds = new ArrayList<>(explicitParts.length);
+
+            for (int explicitPart : explicitParts)
+                partIds.add(explicitPart);
+
+            return partIds;
+        }
+    }
+
+    /** */
+    private String groupPartitionReservation(
+        List<GridReservable> reserved,
+        GridReservable grpReservation,
+        GridCacheContext<?, ?> cctx,
+        AffinityTopologyVersion topVer,
+        UUID nodeId,
+        String qryInfo
+    ) {
+        if (grpReservation != REPLICATED_RESERVABLE) {
+            if (!grpReservation.reserve()) {
+                return String.format("Failed to reserve partitions for query 
(group " +
+                    "reservation failed) [localNodeId=%s, rmtNodeId=%s, %s, 
affTopVer=%s, cacheId=%s, " +
+                    "cacheName=%s]", ctx.localNodeId(), nodeId, qryInfo, 
topVer, cctx.cacheId(), cctx.name());
+            }
+
+            reserved.add(grpReservation);
+
+            MTC.span().addLog(() -> "Cache partitions were reserved " + 
grpReservation);
+        }
+
+        return null;
+    }
+
+    /**
+     * Mapper fake reservation object for replicated caches.
+     */
+    private static class ReplicatedReservable implements GridReservable {
+        /** {@inheritDoc} */
+        @Override public boolean reserve() {
+            throw new IllegalStateException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void release() {
+            throw new IllegalStateException();
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 78b47d4f0fd..255910bfcf0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -90,6 +90,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
@@ -308,6 +309,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Global schema SQL views manager. */
     private final SchemaSqlViewManager schemaSqlViewMgr;
 
+    /** Partition reservation manager. */
+    private final PartitionReservationManager partReservationMgr;
+
     /** @see TransactionConfiguration#isTxAwareQueriesEnabled()  */
     private final boolean txAwareQueriesEnabled;
 
@@ -331,6 +335,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         schemaSqlViewMgr = new SchemaSqlViewManager(ctx);
 
+        partReservationMgr = new PartitionReservationManager(ctx);
+
         idxProc = ctx.indexProcessor();
 
         idxQryPrc = new IndexQueryProcessor(idxProc);
@@ -1060,6 +1066,11 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         return runningQryMgr;
     }
 
+    /** Partition reservation manager. */
+    public PartitionReservationManager partitionReservationManager() {
+        return partReservationMgr;
+    }
+
     /**
      * Create type descriptors from schema and initialize indexing for given 
cache.<p>
      * Use with {@link #busyLock} where appropriate.
@@ -1327,6 +1338,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         try {
             if (schemaMgr.clearCacheContext(cacheInfo.cacheContext())) {
+                partReservationMgr.onCacheStop(cacheInfo.name());
+
                 if (idx != null)
                     idx.unregisterCache(cacheInfo);
             }
@@ -2448,6 +2461,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
                 schemaMgr.onCacheStopped(cacheName, destroy, clearIdx);
 
+                partReservationMgr.onCacheStop(cacheName);
+
                 // Notify indexing.
                 if (idx != null)
                     idx.unregisterCache(cacheInfo);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index fa0804b555a..86ee2d79e8a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,6 +65,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
@@ -97,7 +98,6 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import 
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -1565,7 +1565,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         this.ctx = ctx;
 
-        partReservationMgr = new PartitionReservationManager(ctx);
+        partReservationMgr = ctx.query().partitionReservationManager();
 
         connMgr = new ConnectionManager(ctx);
 
@@ -1839,10 +1839,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public void unregisterCache(GridCacheContextInfo<?, ?> 
cacheInfo) {
-        String cacheName = cacheInfo.name();
-
-        partReservationMgr.onCacheStop(cacheName);
-
         // Unregister connection.
         connMgr.onCacheDestroyed();
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
index 8930a89fd8c..1b0d4511f0f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import java.util.Objects;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
 import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import 
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 4bd579a7eb4..a3b5d3a9c90 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
deleted file mode 100644
index d092c3cd2d8..00000000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static 
org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE;
-
-/**
- * Class responsible for partition reservation for queries executed on local 
node. Prevents partitions from being
- * evicted from node during query execution.
- */
-public class PartitionReservationManager implements PartitionsExchangeAware {
-    /** Special instance of reservable object for REPLICATED caches. */
-    private static final ReplicatedReservable REPLICATED_RESERVABLE = new 
ReplicatedReservable();
-
-    /** Kernal context. */
-    private final GridKernalContext ctx;
-
-    /**
-     * Group reservations cache. When affinity version is not changed and all 
primary partitions must be reserved we get
-     * group reservation from this map instead of create new reservation group.
-     */
-    private final ConcurrentMap<PartitionReservationKey, GridReservable> 
reservations = new ConcurrentHashMap<>();
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /**
-     * Constructor.
-     *
-     * @param ctx Context.
-     */
-    public PartitionReservationManager(GridKernalContext ctx) {
-        this.ctx = ctx;
-
-        log = ctx.log(PartitionReservationManager.class);
-
-        ctx.cache().context().exchange().registerExchangeAwareComponent(this);
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param p Partition ID.
-     * @return Partition.
-     */
-    private static GridDhtLocalPartition partition(GridCacheContext<?, ?> 
cctx, int p) {
-        return cctx.topology().localPartition(p, NONE, false);
-    }
-
-    /**
-     * @param cacheIds Cache IDs.
-     * @param reqTopVer Topology version from request.
-     * @param explicitParts Explicit partitions list.
-     * @param nodeId Node ID.
-     * @param reqId Request ID.
-     * @return String which is null in case of success or with causeMessage if 
failed
-     * @throws IgniteCheckedException If failed.
-     */
-    public PartitionReservation reservePartitions(
-        @Nullable List<Integer> cacheIds,
-        AffinityTopologyVersion reqTopVer,
-        final int[] explicitParts,
-        UUID nodeId,
-        long reqId
-    ) throws IgniteCheckedException {
-        try (TraceSurroundings ignored = 
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
-            assert reqTopVer != null;
-
-            AffinityTopologyVersion topVer = 
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
-
-            if (F.isEmpty(cacheIds))
-                return new PartitionReservation(Collections.emptyList());
-
-            Collection<Integer> partIds;
-
-            if (explicitParts == null)
-                partIds = null;
-            else if (explicitParts.length == 0)
-                partIds = Collections.emptyList();
-            else {
-                partIds = new ArrayList<>(explicitParts.length);
-
-                for (int explicitPart : explicitParts)
-                    partIds.add(explicitPart);
-            }
-
-            List<GridReservable> reserved = new ArrayList<>();
-
-            for (int i = 0; i < cacheIds.size(); i++) {
-                GridCacheContext<?, ?> cctx = 
ctx.cache().context().cacheContext(cacheIds.get(i));
-
-                // Cache was not found, probably was not deployed yet.
-                if (cctx == null) {
-                    return new PartitionReservation(reserved,
-                        String.format("Failed to reserve partitions for query 
(cache is not " +
-                                "found on local node) [localNodeId=%s, 
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
-                            ctx.localNodeId(), nodeId, reqId, topVer, 
cacheIds.get(i)));
-                }
-
-                if (!cctx.rebalanceEnabled())
-                    continue;
-
-                // For replicated cache topology version does not make sense.
-                final PartitionReservationKey grpKey = new 
PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
-
-                GridReservable r = reservations.get(grpKey);
-
-                if (explicitParts == null && r != null) { // Try to reserve 
group partition if any and no explicits.
-                    if (r != REPLICATED_RESERVABLE) {
-                        if (!r.reserve())
-                            return new PartitionReservation(reserved,
-                                String.format("Failed to reserve partitions 
for query (group " +
-                                    "reservation failed) [localNodeId=%s, 
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                    "cacheName=%s]", ctx.localNodeId(), 
nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
-
-                        reserved.add(r);
-
-                        MTC.span().addLog(() -> "Cache partitions were 
reserved " + r);
-                    }
-                }
-                else { // Try to reserve partitions one by one.
-                    int partsCnt = cctx.affinity().partitions();
-
-                    if (cctx.isReplicated()) { // Check all the partitions are 
in owning state for replicated cache.
-                        if (r == null) { // Check only once.
-                            for (int p = 0; p < partsCnt; p++) {
-                                GridDhtLocalPartition part = partition(cctx, 
p);
-
-                                // We don't need to reserve partitions because 
they will not be evicted in replicated caches.
-                                GridDhtPartitionState partState = part != null 
? part.state() : null;
-
-                                if (partState != OWNING)
-                                    return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve 
partitions for " +
-                                                "query (partition of 
REPLICATED cache is not in OWNING state) [" +
-                                                "localNodeId=%s, rmtNodeId=%s, 
reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                                "cacheName=%s, part=%s, 
partFound=%s, partState=%s]",
-                                            ctx.localNodeId(),
-                                            nodeId,
-                                            reqId,
-                                            topVer,
-                                            cacheIds.get(i),
-                                            cctx.name(),
-                                            p,
-                                            (part != null),
-                                            partState
-                                        ));
-                            }
-
-                            // Mark that we checked this replicated cache.
-                            reservations.putIfAbsent(grpKey, 
REPLICATED_RESERVABLE);
-
-                            MTC.span().addLog(() -> "Cache partitions were 
reserved [cache=" + cctx.name() +
-                                ", partitions=[0.." + partsCnt + ']');
-                        }
-                    }
-                    else { // Reserve primary partitions for partitioned cache 
(if no explicit given).
-                        if (explicitParts == null)
-                            partIds = 
cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
-
-                        int reservedCnt = 0;
-
-                        for (int partId : partIds) {
-                            GridDhtLocalPartition part = partition(cctx, 
partId);
-
-                            GridDhtPartitionState partState = part != null ? 
part.state() : null;
-
-                            if (partState != OWNING) {
-                                if (partState == LOST)
-                                    failQueryOnLostData(cctx, part);
-                                else {
-                                    return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve 
partitions " +
-                                                "for query (partition of 
PARTITIONED cache is not found or not in OWNING " +
-                                                "state) [localNodeId=%s, 
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                                "cacheName=%s, part=%s, 
partFound=%s, partState=%s]",
-                                            ctx.localNodeId(),
-                                            nodeId,
-                                            reqId,
-                                            topVer,
-                                            cacheIds.get(i),
-                                            cctx.name(),
-                                            partId,
-                                            (part != null),
-                                            partState
-                                        ));
-                                }
-                            }
-
-                            if (!part.reserve()) {
-                                return new PartitionReservation(reserved,
-                                    String.format("Failed to reserve 
partitions for query " +
-                                            "(partition of PARTITIONED cache 
cannot be reserved) [" +
-                                            "localNodeId=%s, rmtNodeId=%s, 
reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                            "cacheName=%s, part=%s, 
partFound=%s, partState=%s]",
-                                        ctx.localNodeId(),
-                                        nodeId,
-                                        reqId,
-                                        topVer,
-                                        cacheIds.get(i),
-                                        cctx.name(),
-                                        partId,
-                                        true,
-                                        partState
-                                    ));
-                            }
-
-                            reserved.add(part);
-
-                            reservedCnt++;
-
-                            // Double check that we are still in owning state 
and partition contents are not cleared.
-                            partState = part.state();
-
-                            if (partState != OWNING) {
-                                if (partState == LOST)
-                                    failQueryOnLostData(cctx, part);
-                                else {
-                                    return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve 
partitions for " +
-                                                "query (partition of 
PARTITIONED cache is not in OWNING state after " +
-                                                "reservation) [localNodeId=%s, 
rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
-                                                "cacheId=%s, cacheName=%s, 
part=%s, partState=%s]",
-                                            ctx.localNodeId(),
-                                            nodeId,
-                                            reqId,
-                                            topVer,
-                                            cacheIds.get(i),
-                                            cctx.name(),
-                                            partId,
-                                            partState
-                                        ));
-                                }
-                            }
-                        }
-
-                        final Collection<Integer> finalPartIds = partIds;
-
-                        MTC.span().addLog(() -> "Cache partitions were 
reserved [cache=" + cctx.name() +
-                            ", partitions=" + finalPartIds + ", topology=" + 
topVer + ']');
-
-                        if (explicitParts == null && reservedCnt > 0) {
-                            // We reserved all the primary partitions for 
cache, attempt to add group reservation.
-                            GridDhtPartitionsReservation grp = new 
GridDhtPartitionsReservation(topVer, cctx, "SQL");
-
-                            if (grp.register(reserved.subList(reserved.size() 
- reservedCnt, reserved.size()))) {
-                                if (reservations.putIfAbsent(grpKey, grp) != 
null)
-                                    throw new 
IllegalStateException("Reservation already exists.");
-
-                                grp.onPublish(new 
CI1<GridDhtPartitionsReservation>() {
-                                    @Override public void 
apply(GridDhtPartitionsReservation r) {
-                                        reservations.remove(grpKey, r);
-                                    }
-                                });
-                            }
-                        }
-                    }
-                }
-            }
-
-            return new PartitionReservation(reserved);
-        }
-    }
-
-    /**
-     * @param cacheName Cache name.
-     */
-    public void onCacheStop(String cacheName) {
-        // Drop group reservations.
-        for (PartitionReservationKey grpKey : reservations.keySet()) {
-            if (F.eq(grpKey.cacheName(), cacheName))
-                reservations.remove(grpKey);
-        }
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param part Partition.
-     */
-    private static void failQueryOnLostData(GridCacheContext cctx, 
GridDhtLocalPartition part)
-        throws IgniteCheckedException {
-        throw new CacheInvalidStateException("Failed to execute query because 
cache partition has been " +
-            "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
-    }
-
-    /**
-     * Cleanup group reservations cache on change affinity version.
-     */
-    @Override public void onDoneAfterTopologyUnlock(final 
GridDhtPartitionsExchangeFuture fut) {
-        try {
-            // Must not do anything at the exchange thread. Dispatch to the 
management thread pool.
-            ctx.closure().runLocal(
-                new GridPlainRunnable() {
-                    @Override public void run() {
-                        AffinityTopologyVersion topVer = 
ctx.cache().context().exchange()
-                            
.lastAffinityChangedTopologyVersion(fut.topologyVersion());
-
-                        reservations.forEach((key, r) -> {
-                            if (r != REPLICATED_RESERVABLE && 
!F.eq(key.topologyVersion(), topVer)) {
-                                assert r instanceof 
GridDhtPartitionsReservation;
-
-                                ((GridDhtPartitionsReservation)r).invalidate();
-                            }
-                        });
-                    }
-                },
-                GridIoPolicy.MANAGEMENT_POOL);
-        }
-        catch (Throwable e) {
-            log.error("Unexpected exception on start reservations cleanup.");
-            ctx.failure().process(new FailureContext(CRITICAL_ERROR, e));
-        }
-    }
-
-    /**
-     * Mapper fake reservation object for replicated caches.
-     */
-    private static class ReplicatedReservable implements GridReservable {
-        /** {@inheritDoc} */
-        @Override public boolean reserve() {
-            throw new IllegalStateException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void release() {
-            throw new IllegalStateException();
-        }
-    }
-}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index 0c30057fd8f..70847fabff3 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -69,11 +69,11 @@ import 
org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import 
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
-import 
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index 0f5491934e9..7b3c83e5bc9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -38,6 +38,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationKey;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;

Reply via email to