This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b72191b35 IGNITE-23562 SQL Calcite: Fix partition reservation for 
index-count-scan - Fixes #11651.
c4b72191b35 is described below

commit c4b72191b35dccf3cef00958ee1496a0f25cdb49
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Dec 5 18:27:57 2024 +0300

    IGNITE-23562 SQL Calcite: Fix partition reservation for index-count-scan - 
Fixes #11651.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/AbstractCacheScan.java      |   4 +-
 .../query/calcite/exec/IndexCountScan.java         | 197 +++++++++++++++++++++
 .../query/calcite/exec/LogicalRelImplementor.java  |   4 +-
 .../query/calcite/schema/CacheIndexImpl.java       | 147 +--------------
 .../query/calcite/schema/IgniteIndex.java          |   4 +-
 .../query/calcite/schema/SystemViewIndexImpl.java  |   5 +-
 .../integration/AbstractBasicIntegrationTest.java  |   4 +-
 .../AbstractBasicIntegrationTransactionalTest.java |   4 +-
 .../PartitionsReservationIntegrationTest.java      | 142 +++++++++++++++
 .../UnstableTopologyIntegrationTest.java}          |  41 ++---
 .../ignite/testsuites/IntegrationTestSuite.java    |   6 +-
 11 files changed, 376 insertions(+), 182 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index b35290e1bd9..a8126fbce4d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -119,7 +119,7 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
         else
             toReserve = Collections.emptyList();
 
-        reserved = new ArrayList<>(toReserve.size());
+        List<GridDhtLocalPartition> reserved = new 
ArrayList<>(toReserve.size());
 
         try {
             for (GridDhtLocalPartition part : toReserve) {
@@ -140,6 +140,8 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
             throw e;
         }
         finally {
+            this.reserved = reserved;
+
             top.readUnlock();
         }
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
new file mode 100644
index 00000000000..79995facb0e
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import 
org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
+import 
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
+import org.apache.ignite.internal.cache.query.index.sorted.keys.NullIndexKey;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import 
org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class IndexCountScan<Row> extends AbstractCacheScan<Row> {
+    /** */
+    private final InlineIndex idx;
+
+    /** */
+    private final RelCollation collation;
+
+    /** */
+    private final boolean notNull;
+
+    /** */
+    public IndexCountScan(
+        ExecutionContext<Row> ectx,
+        GridCacheContext<?, ?> cctx,
+        int[] parts,
+        InlineIndex idx,
+        RelCollation collation,
+        boolean notNull
+    ) {
+        super(ectx, cctx, parts);
+
+        this.idx = idx;
+        this.collation = collation;
+        this.notNull = notNull;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterator<Row> createIterator() {
+        boolean[] skipCheck = new boolean[] {false};
+
+        BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = 
countRowFilter(skipCheck, notNull, idx);
+
+        long cnt = 0;
+
+        if (!F.isEmpty(ectx.getQryTxEntries())) {
+            TransactionChanges<CacheDataRow> txChanges = 
ectx.transactionChanges(
+                cctx.cacheId(),
+                parts,
+                Function.identity(),
+                null
+            );
+
+            if (!txChanges.changedKeysEmpty()) {
+                rowFilter = transactionAwareCountRowFilter(rowFilter, 
txChanges);
+
+                cnt = countTransactionRows(notNull, idx, 
txChanges.newAndUpdatedEntries());
+            }
+        }
+
+        try {
+            IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(cctx.kernalContext(), topVer, parts);
+
+            for (int i = 0; i < idx.segmentsCount(); ++i) {
+                cnt += idx.count(i, new IndexQueryContext(filter, rowFilter));
+
+                skipCheck[0] = false;
+            }
+
+            return 
Collections.singletonList(ectx.rowHandler().factory(long.class).create(cnt)).iterator();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Unable to count index records.", e);
+        }
+    }
+
+    /** */
+    private @Nullable BPlusTree.TreeRowClosure<IndexRow, IndexRow> 
countRowFilter(boolean[] skipCheck, boolean notNull, InlineIndex iidx) {
+        boolean checkExpired = !cctx.config().isEagerTtl();
+
+        if (notNull) {
+            boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection == 
RelFieldCollation.NullDirection.FIRST;
+
+            BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = 
IndexScan.createNotNullRowFilter(iidx, checkExpired);
+
+            return new BPlusTree.TreeRowClosure<>() {
+                @Override public boolean apply(
+                    BPlusTree<IndexRow, IndexRow> tree,
+                    BPlusIO<IndexRow> io,
+                    long pageAddr,
+                    int idx
+                ) throws IgniteCheckedException {
+                    // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
+                    // don't need to check it with notNullRowFilter.
+                    // In case of NULL-LAST collation, all values after first 
null value will be null,
+                    // don't need to check it too.
+                    if (skipCheck[0] && !checkExpired)
+                        return nullsFirst;
+
+                    boolean res = notNullRowFilter.apply(tree, io, pageAddr, 
idx);
+
+                    if (res == nullsFirst)
+                        skipCheck[0] = true;
+
+                    return res;
+                }
+
+                @Override public IndexRow lastRow() {
+                    return (skipCheck[0] && !checkExpired)
+                        ? null
+                        : notNullRowFilter.lastRow();
+                }
+            };
+        }
+
+        return checkExpired ? IndexScan.createNotExpiredRowFilter() : null;
+    }
+
+    /** */
+    private static BPlusTree.TreeRowClosure<IndexRow, IndexRow> 
transactionAwareCountRowFilter(
+        BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter,
+        TransactionChanges<CacheDataRow> txChanges
+    ) {
+        return new BPlusTree.TreeRowClosure<>() {
+            @Override public boolean apply(
+                BPlusTree<IndexRow, IndexRow> tree,
+                BPlusIO<IndexRow> io,
+                long pageAddr,
+                int idx
+            ) throws IgniteCheckedException {
+                if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr, 
idx))
+                    return false;
+
+                if (txChanges.changedKeysEmpty())
+                    return true;
+
+                IndexRow row = rowFilter == null ? null : rowFilter.lastRow();
+
+                if (row == null)
+                    row = tree.getRow(io, pageAddr, idx);
+
+                // Intentionally use of `remove` here.
+                // We want to perform as few `key` as possible.
+                // So we break some rules here to optimize work with the data 
provided by the tree.
+                return !txChanges.remove(row.cacheDataRow().key());
+            }
+        };
+    }
+
+    /** */
+    private static long countTransactionRows(boolean notNull, InlineIndex 
iidx, List<CacheDataRow> changedRows) {
+        InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler();
+
+        long cnt = 0;
+
+        for (CacheDataRow txRow : changedRows) {
+            if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE && notNull)
+                continue;
+
+            cnt++;
+        }
+
+        return cnt;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 6b5a2661939..30737feed7a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -444,8 +443,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         if (idx != null && !tbl.isIndexRebuildInProgress()) {
             return new ScanStorageNode<>(idx.name() + "_COUNT", ctx, 
rel.getRowType(),
-                () -> 
Collections.singletonList(ctx.rowHandler().factory(ctx.getTypeFactory(), 
rel.getRowType())
-                    .create(idx.count(ctx, ctx.group(rel.sourceId()), 
rel.notNull()))).iterator());
+                idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()));
         }
         else {
             CollectNode<Row> replacement = 
CollectNode.createCountCollector(ctx);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index a6496d38baa..c09544f96fa 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -20,34 +20,23 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Function;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.cache.query.index.Index;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
 import 
org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
-import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
-import 
org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
-import 
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistry;
-import org.apache.ignite.internal.cache.query.index.sorted.keys.NullIndexKey;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import 
org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.IndexCountScan;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.IndexFirstLastScan;
 import org.apache.ignite.internal.processors.query.calcite.exec.IndexScan;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
@@ -56,10 +45,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.bounds.Search
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -158,139 +143,15 @@ public class CacheIndexImpl implements IgniteIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, 
boolean notNull) {
+    @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx, 
ColocationGroup grp, boolean notNull) {
         if (idx == null || !grp.nodeIds().contains(ectx.localNodeId()))
-            return 0;
+            return 
Collections.singletonList(ectx.rowHandler().factory(long.class).create(0L));
 
         int[] locParts = grp.partitions(ectx.localNodeId());
 
         InlineIndex iidx = idx.unwrap(InlineIndex.class);
 
-        boolean[] skipCheck = new boolean[] {false};
-
-        BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = 
countRowFilter(skipCheck, notNull, iidx);
-
-        long cnt = 0;
-
-        if (!F.isEmpty(ectx.getQryTxEntries())) {
-            TransactionChanges<CacheDataRow> txChanges = 
ectx.transactionChanges(
-                iidx.indexDefinition().cacheInfo().cacheId(),
-                locParts,
-                Function.identity(),
-                null
-            );
-
-            if (!txChanges.changedKeysEmpty()) {
-                rowFilter = transactionAwareCountRowFilter(rowFilter, 
txChanges);
-
-                cnt = countTransactionRows(notNull, iidx, 
txChanges.newAndUpdatedEntries());
-            }
-        }
-
-        try {
-            IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
-                ectx.topologyVersion(), locParts);
-
-            for (int i = 0; i < iidx.segmentsCount(); ++i) {
-                cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));
-
-                skipCheck[0] = false;
-            }
-
-            return cnt;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException("Unable to count index records.", e);
-        }
-    }
-
-    /** */
-    private @Nullable BPlusTree.TreeRowClosure<IndexRow, IndexRow> 
countRowFilter(boolean[] skipCheck, boolean notNull, InlineIndex iidx) {
-        boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
-
-        if (notNull) {
-            boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection == 
RelFieldCollation.NullDirection.FIRST;
-
-            BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = 
IndexScan.createNotNullRowFilter(iidx, checkExpired);
-
-            return new BPlusTree.TreeRowClosure<>() {
-                @Override public boolean apply(
-                    BPlusTree<IndexRow, IndexRow> tree,
-                    BPlusIO<IndexRow> io,
-                    long pageAddr,
-                    int idx
-                ) throws IgniteCheckedException {
-                    // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
-                    // don't need to check it with notNullRowFilter.
-                    // In case of NULL-LAST collation, all values after first 
null value will be null,
-                    // don't need to check it too.
-                    if (skipCheck[0] && !checkExpired)
-                        return nullsFirst;
-
-                    boolean res = notNullRowFilter.apply(tree, io, pageAddr, 
idx);
-
-                    if (res == nullsFirst)
-                        skipCheck[0] = true;
-
-                    return res;
-                }
-
-                @Override public IndexRow lastRow() {
-                    return (skipCheck[0] && !checkExpired)
-                        ? null
-                        : notNullRowFilter.lastRow();
-                }
-            };
-        }
-
-        return checkExpired ? IndexScan.createNotExpiredRowFilter() : null;
-    }
-
-    /** */
-    private static @NotNull BPlusTree.TreeRowClosure<IndexRow, IndexRow> 
transactionAwareCountRowFilter(
-        BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter,
-        TransactionChanges<CacheDataRow> txChanges
-    ) {
-        return new BPlusTree.TreeRowClosure<>() {
-            @Override public boolean apply(
-                BPlusTree<IndexRow, IndexRow> tree,
-                BPlusIO<IndexRow> io,
-                long pageAddr,
-                int idx
-            ) throws IgniteCheckedException {
-                if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr, 
idx))
-                    return false;
-
-                if (txChanges.changedKeysEmpty())
-                    return true;
-
-                IndexRow row = rowFilter == null ? null : rowFilter.lastRow();
-
-                if (row == null)
-                    row = tree.getRow(io, pageAddr, idx);
-
-                // Intentionally use of `remove` here.
-                // We want to perform as few `key` as possible.
-                // So we break some rules here to optimize work with the data 
provided by the tree.
-                return !txChanges.remove(row.cacheDataRow().key());
-            }
-        };
-    }
-
-    /** */
-    private static long countTransactionRows(boolean notNull, InlineIndex 
iidx, List<CacheDataRow> changedRows) {
-        InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler();
-
-        long cnt = 0;
-
-        for (CacheDataRow txRow : changedRows) {
-            if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE && notNull)
-                continue;
-
-            cnt++;
-        }
-
-        return cnt;
+        return new IndexCountScan<>(ectx, tbl.descriptor().cacheContext(), 
locParts, iidx, collation, notNull);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 618a78920a2..903d4eddf5d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -88,9 +88,9 @@ public interface IgniteIndex {
      * @param ectx Execution context.
      * @param grp  Colocation group.
      * @param notNull Exclude null values.
-     * @return Index records number for {@code group}.
+     * @return Iterable with one row and one col: records count by index for 
{@code group}.
      */
-    public long count(ExecutionContext<?> ectx, ColocationGroup grp, boolean 
notNull);
+    public <Row> Iterable<Row> count(ExecutionContext<Row> ectx, 
ColocationGroup grp, boolean notNull);
 
     /**
      * Takes only first or last not-null index value.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
index 38657fec1e6..1a8abd64aea 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
+import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -93,10 +94,10 @@ public class SystemViewIndexImpl implements IgniteIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, 
boolean notNull) {
+    @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx, 
ColocationGroup grp, boolean notNull) {
         assert !notNull; // Collation is empty, cannot come here with 
"notNull" flag.
 
-        return tbl.descriptor().systemView().size();
+        return 
Collections.singletonList(ectx.rowHandler().factory(long.class).create(tbl.descriptor().systemView().size()));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 8451d68d277..81e6024b56b 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -144,7 +144,7 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
+    protected QueryChecker assertQuery(Ignite ignite, String qry) {
         return new QueryChecker(qry) {
             @Override protected QueryEngine getEngine() {
                 return Commons.lookupComponent(((IgniteEx)ignite).context(), 
QueryEngine.class);
@@ -301,7 +301,7 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public long count(ExecutionContext<?> ectx, ColocationGroup 
grp, boolean notNull) {
+        @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx, 
ColocationGroup grp, boolean notNull) {
             return delegate.count(ectx, grp, notNull);
         }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
index e193a8b785e..f35e2c29a01 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
@@ -113,10 +113,10 @@ public abstract class 
AbstractBasicIntegrationTransactionalTest extends Abstract
     }
 
     /** {@inheritDoc} */
-    @Override protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
+    @Override protected QueryChecker assertQuery(Ignite ignite, String qry) {
         return new QueryChecker(qry, tx, sqlTxMode) {
             @Override protected QueryEngine getEngine() {
-                return Commons.lookupComponent(ignite.context(), 
QueryEngine.class);
+                return Commons.lookupComponent(((IgniteEx)ignite).context(), 
QueryEngine.class);
             }
         };
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
new file mode 100644
index 00000000000..242fe33b596
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.processors.query.calcite.exec.IndexScan;
+import org.apache.ignite.internal.processors.query.calcite.exec.TableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
+import org.apache.ignite.internal.util.typedef.X;
+import org.junit.Test;
+
+/**
+ * Tests partition reservation/releasing for queries over unstable topology.
+ */
+public class PartitionsReservationIntegrationTest extends 
AbstractBasicIntegrationTest {
+    /** */
+    private static final int PARTS = 16;
+
+    /** */
+    private static final int KEYS = PARTS * 100_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op. Don't start any grids.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(2);
+
+        client = startClientGrid();
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setIndexedTypes(Integer.class, Employer.class)
+            .setSqlSchema(QueryUtils.DFLT_SCHEMA)
+            .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testIndexCount() throws Exception {
+        checkPartitionsReservationRelease(
+            assertQuery("SELECT COUNT(_key) FROM Employer")
+                
.matches(QueryChecker.containsSubPlan(IgniteIndexCount.class.getSimpleName()))
+                .ordered() // To avoid modification of QueryChecker.
+                .returns((long)KEYS));
+    }
+
+    /** */
+    @Test
+    public void testIndexScan() throws Exception {
+        checkPartitionsReservationRelease(
+            assertQuery("SELECT /*+ FORCE_INDEX */ * FROM Employer")
+                
.matches(QueryChecker.containsSubPlan(IndexScan.class.getSimpleName()))
+                .resultSize(KEYS));
+    }
+
+    /** */
+    @Test
+    public void testTableScan() throws Exception {
+        checkPartitionsReservationRelease(
+            assertQuery("SELECT * FROM Employer")
+                
.matches(QueryChecker.containsSubPlan(TableScan.class.getSimpleName()))
+                .resultSize(KEYS));
+    }
+
+    /** */
+    public void checkPartitionsReservationRelease(QueryChecker checker) throws 
Exception {
+        try (IgniteDataStreamer<Object, Object> streamer = 
client.dataStreamer(DEFAULT_CACHE_NAME)) {
+            for (int i = 0; i < KEYS; i++)
+                streamer.addData(i, new Employer("name" + i, (double)i));
+        }
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> qryFut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                while (!stop.get()) {
+                    try {
+                        checker.check();
+                    }
+                    catch (IgniteSQLException e) {
+                        if (X.hasCause(e, ClusterTopologyException.class))
+                            continue; // Expected when topology changed while 
query starts.
+
+                        throw e;
+                    }
+                }
+            }
+        }, 10, "qry-thread");
+
+        // Trigger rebalance and partitions eviction.
+        startGrid(2);
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        doSleep(1_000L);
+
+        stop.set(true);
+
+        qryFut.get();
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
similarity index 86%
rename from 
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
rename to 
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
index b1e5897690e..120642ee2b1 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -38,23 +38,19 @@ import org.apache.ignite.cache.QueryIndexType;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.query.QueryEngine;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingSpi;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-/** Non stable topology tests. */
+/** Non-stable topology tests. */
 @RunWith(Parameterized.class)
-public class UnstableTopologyTest extends GridCommonAbstractTest {
+public class UnstableTopologyIntegrationTest extends 
AbstractBasicIntegrationTest {
     /** */
     private static final String POI_CACHE_NAME = "POI_CACHE";
 
@@ -101,10 +97,18 @@ public class UnstableTopologyTest extends 
GridCommonAbstractTest {
     @Parameterized.Parameter(1)
     public boolean idxSlowDown;
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op. We don't need to start anything.
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        if (idxSlowDown)
+            cfg.setIndexingSpi(new BlockingIndexingSpi());
+
         cfg.setCacheConfiguration(new CacheConfiguration<>(POI_CACHE_NAME)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC)
             .setSqlSchema(POI_SCHEMA_NAME)
@@ -149,22 +153,19 @@ public class UnstableTopologyTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testSelectCorrectnessOnUnstableTopology() throws Exception {
-        ignitionStart(0, idxSlowDown);
-        IgniteEx ig = ignitionStart(1, idxSlowDown);
+        startGrids(2);
 
-        loadData(ig, 0, NUM_ENTITIES);
+        loadData(grid(1), 0, NUM_ENTITIES);
 
-        ignitionStart(2, idxSlowDown);
+        startGrid(2);
 
         if (awaitExchange)
             awaitPartitionMapExchange(true, true, null);
         else
             awaitPartitionMapExchange();
 
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), 
QueryEngine.class);
-
-        G.allGrids().forEach(g -> assertEquals(NUM_ENTITIES, 
engine.query(null, POI_SCHEMA_NAME,
-            "SELECT * FROM " + POI_TABLE_NAME).get(0).getAll().size()));
+        G.allGrids().forEach(g -> assertQuery(g, "SELECT * FROM " + 
POI_SCHEMA_NAME + '.' + POI_TABLE_NAME)
+            .resultSize(NUM_ENTITIES).check());
     }
 
     /** */
@@ -184,16 +185,6 @@ public class UnstableTopologyTest extends 
GridCommonAbstractTest {
         }
     }
 
-    /** Start with custon indexing SPI. */
-    private IgniteEx ignitionStart(int idx, boolean slow) throws Exception {
-        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(idx));
-
-        if (slow)
-            cfg.setIndexingSpi(new BlockingIndexingSpi());
-
-        return startGrid(cfg);
-    }
-
     /**
      * Simple blocking indexing SPI.
      */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index a062d3d3169..9117c21a6ec 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -22,7 +22,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import 
org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest;
 import 
org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import 
org.apache.ignite.internal.processors.query.calcite.UnstableTopologyTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.CalciteBasicSecondaryIndexIntegrationTest;
@@ -53,6 +52,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuo
 import 
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest;
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.SystemVie
 import 
org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.TimeoutIntegrationTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.UnstableTopologyIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
@@ -116,7 +117,8 @@ import org.junit.runners.Suite;
     IndexScanlIntegrationTest.class,
     IndexScanMultiNodeIntegrationTest.class,
     SetOpIntegrationTest.class,
-    UnstableTopologyTest.class,
+    UnstableTopologyIntegrationTest.class,
+    PartitionsReservationIntegrationTest.class,
     JoinCommuteRulesTest.class,
     ServerStatisticsIntegrationTest.class,
     JoinIntegrationTest.class,


Reply via email to