This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c4b72191b35 IGNITE-23562 SQL Calcite: Fix partition reservation for
index-count-scan - Fixes #11651.
c4b72191b35 is described below
commit c4b72191b35dccf3cef00958ee1496a0f25cdb49
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Dec 5 18:27:57 2024 +0300
IGNITE-23562 SQL Calcite: Fix partition reservation for index-count-scan -
Fixes #11651.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/AbstractCacheScan.java | 4 +-
.../query/calcite/exec/IndexCountScan.java | 197 +++++++++++++++++++++
.../query/calcite/exec/LogicalRelImplementor.java | 4 +-
.../query/calcite/schema/CacheIndexImpl.java | 147 +--------------
.../query/calcite/schema/IgniteIndex.java | 4 +-
.../query/calcite/schema/SystemViewIndexImpl.java | 5 +-
.../integration/AbstractBasicIntegrationTest.java | 4 +-
.../AbstractBasicIntegrationTransactionalTest.java | 4 +-
.../PartitionsReservationIntegrationTest.java | 142 +++++++++++++++
.../UnstableTopologyIntegrationTest.java} | 41 ++---
.../ignite/testsuites/IntegrationTestSuite.java | 6 +-
11 files changed, 376 insertions(+), 182 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index b35290e1bd9..a8126fbce4d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -119,7 +119,7 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
else
toReserve = Collections.emptyList();
- reserved = new ArrayList<>(toReserve.size());
+ List<GridDhtLocalPartition> reserved = new
ArrayList<>(toReserve.size());
try {
for (GridDhtLocalPartition part : toReserve) {
@@ -140,6 +140,8 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
throw e;
}
finally {
+ this.reserved = reserved;
+
top.readUnlock();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
new file mode 100644
index 00000000000..79995facb0e
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexCountScan.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import
org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
+import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
+import org.apache.ignite.internal.cache.query.index.sorted.keys.NullIndexKey;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import
org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class IndexCountScan<Row> extends AbstractCacheScan<Row> {
+ /** */
+ private final InlineIndex idx;
+
+ /** */
+ private final RelCollation collation;
+
+ /** */
+ private final boolean notNull;
+
+ /** */
+ public IndexCountScan(
+ ExecutionContext<Row> ectx,
+ GridCacheContext<?, ?> cctx,
+ int[] parts,
+ InlineIndex idx,
+ RelCollation collation,
+ boolean notNull
+ ) {
+ super(ectx, cctx, parts);
+
+ this.idx = idx;
+ this.collation = collation;
+ this.notNull = notNull;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Iterator<Row> createIterator() {
+ boolean[] skipCheck = new boolean[] {false};
+
+ BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter =
countRowFilter(skipCheck, notNull, idx);
+
+ long cnt = 0;
+
+ if (!F.isEmpty(ectx.getQryTxEntries())) {
+ TransactionChanges<CacheDataRow> txChanges =
ectx.transactionChanges(
+ cctx.cacheId(),
+ parts,
+ Function.identity(),
+ null
+ );
+
+ if (!txChanges.changedKeysEmpty()) {
+ rowFilter = transactionAwareCountRowFilter(rowFilter,
txChanges);
+
+ cnt = countTransactionRows(notNull, idx,
txChanges.newAndUpdatedEntries());
+ }
+ }
+
+ try {
+ IndexingQueryFilter filter = new
IndexingQueryFilterImpl(cctx.kernalContext(), topVer, parts);
+
+ for (int i = 0; i < idx.segmentsCount(); ++i) {
+ cnt += idx.count(i, new IndexQueryContext(filter, rowFilter));
+
+ skipCheck[0] = false;
+ }
+
+ return
Collections.singletonList(ectx.rowHandler().factory(long.class).create(cnt)).iterator();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Unable to count index records.", e);
+ }
+ }
+
+ /** */
+ private @Nullable BPlusTree.TreeRowClosure<IndexRow, IndexRow>
countRowFilter(boolean[] skipCheck, boolean notNull, InlineIndex iidx) {
+ boolean checkExpired = !cctx.config().isEagerTtl();
+
+ if (notNull) {
+ boolean nullsFirst =
collation.getFieldCollations().get(0).nullDirection ==
RelFieldCollation.NullDirection.FIRST;
+
+ BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
IndexScan.createNotNullRowFilter(iidx, checkExpired);
+
+ return new BPlusTree.TreeRowClosure<>() {
+ @Override public boolean apply(
+ BPlusTree<IndexRow, IndexRow> tree,
+ BPlusIO<IndexRow> io,
+ long pageAddr,
+ int idx
+ ) throws IgniteCheckedException {
+ // If we have NULLS-FIRST collation, all values after
first not-null value will be not-null,
+ // don't need to check it with notNullRowFilter.
+ // In case of NULL-LAST collation, all values after first
null value will be null,
+ // don't need to check it too.
+ if (skipCheck[0] && !checkExpired)
+ return nullsFirst;
+
+ boolean res = notNullRowFilter.apply(tree, io, pageAddr,
idx);
+
+ if (res == nullsFirst)
+ skipCheck[0] = true;
+
+ return res;
+ }
+
+ @Override public IndexRow lastRow() {
+ return (skipCheck[0] && !checkExpired)
+ ? null
+ : notNullRowFilter.lastRow();
+ }
+ };
+ }
+
+ return checkExpired ? IndexScan.createNotExpiredRowFilter() : null;
+ }
+
+ /** */
+ private static BPlusTree.TreeRowClosure<IndexRow, IndexRow>
transactionAwareCountRowFilter(
+ BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter,
+ TransactionChanges<CacheDataRow> txChanges
+ ) {
+ return new BPlusTree.TreeRowClosure<>() {
+ @Override public boolean apply(
+ BPlusTree<IndexRow, IndexRow> tree,
+ BPlusIO<IndexRow> io,
+ long pageAddr,
+ int idx
+ ) throws IgniteCheckedException {
+ if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr,
idx))
+ return false;
+
+ if (txChanges.changedKeysEmpty())
+ return true;
+
+ IndexRow row = rowFilter == null ? null : rowFilter.lastRow();
+
+ if (row == null)
+ row = tree.getRow(io, pageAddr, idx);
+
+ // Intentionally use of `remove` here.
+ // We want to perform as few `key` as possible.
+ // So we break some rules here to optimize work with the data
provided by the tree.
+ return !txChanges.remove(row.cacheDataRow().key());
+ }
+ };
+ }
+
+ /** */
+ private static long countTransactionRows(boolean notNull, InlineIndex
iidx, List<CacheDataRow> changedRows) {
+ InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler();
+
+ long cnt = 0;
+
+ for (CacheDataRow txRow : changedRows) {
+ if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE && notNull)
+ continue;
+
+ cnt++;
+ }
+
+ return cnt;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 6b5a2661939..30737feed7a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -444,8 +443,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
return new ScanStorageNode<>(idx.name() + "_COUNT", ctx,
rel.getRowType(),
- () ->
Collections.singletonList(ctx.rowHandler().factory(ctx.getTypeFactory(),
rel.getRowType())
- .create(idx.count(ctx, ctx.group(rel.sourceId()),
rel.notNull()))).iterator());
+ idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()));
}
else {
CollectNode<Row> replacement =
CollectNode.createCountCollector(ctx);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index a6496d38baa..c09544f96fa 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -20,34 +20,23 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.function.Function;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.cache.query.index.Index;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import
org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
-import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
-import
org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
-import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistry;
-import org.apache.ignite.internal.cache.query.index.sorted.keys.NullIndexKey;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import
org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.IndexCountScan;
import
org.apache.ignite.internal.processors.query.calcite.exec.IndexFirstLastScan;
import org.apache.ignite.internal.processors.query.calcite.exec.IndexScan;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
@@ -56,10 +45,6 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.bounds.Search
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -158,139 +143,15 @@ public class CacheIndexImpl implements IgniteIndex {
}
/** {@inheritDoc} */
- @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp,
boolean notNull) {
+ @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx,
ColocationGroup grp, boolean notNull) {
if (idx == null || !grp.nodeIds().contains(ectx.localNodeId()))
- return 0;
+ return
Collections.singletonList(ectx.rowHandler().factory(long.class).create(0L));
int[] locParts = grp.partitions(ectx.localNodeId());
InlineIndex iidx = idx.unwrap(InlineIndex.class);
- boolean[] skipCheck = new boolean[] {false};
-
- BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter =
countRowFilter(skipCheck, notNull, iidx);
-
- long cnt = 0;
-
- if (!F.isEmpty(ectx.getQryTxEntries())) {
- TransactionChanges<CacheDataRow> txChanges =
ectx.transactionChanges(
- iidx.indexDefinition().cacheInfo().cacheId(),
- locParts,
- Function.identity(),
- null
- );
-
- if (!txChanges.changedKeysEmpty()) {
- rowFilter = transactionAwareCountRowFilter(rowFilter,
txChanges);
-
- cnt = countTransactionRows(notNull, iidx,
txChanges.newAndUpdatedEntries());
- }
- }
-
- try {
- IndexingQueryFilter filter = new
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
- ectx.topologyVersion(), locParts);
-
- for (int i = 0; i < iidx.segmentsCount(); ++i) {
- cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));
-
- skipCheck[0] = false;
- }
-
- return cnt;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Unable to count index records.", e);
- }
- }
-
- /** */
- private @Nullable BPlusTree.TreeRowClosure<IndexRow, IndexRow>
countRowFilter(boolean[] skipCheck, boolean notNull, InlineIndex iidx) {
- boolean checkExpired =
!tbl.descriptor().cacheContext().config().isEagerTtl();
-
- if (notNull) {
- boolean nullsFirst =
collation.getFieldCollations().get(0).nullDirection ==
RelFieldCollation.NullDirection.FIRST;
-
- BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
IndexScan.createNotNullRowFilter(iidx, checkExpired);
-
- return new BPlusTree.TreeRowClosure<>() {
- @Override public boolean apply(
- BPlusTree<IndexRow, IndexRow> tree,
- BPlusIO<IndexRow> io,
- long pageAddr,
- int idx
- ) throws IgniteCheckedException {
- // If we have NULLS-FIRST collation, all values after
first not-null value will be not-null,
- // don't need to check it with notNullRowFilter.
- // In case of NULL-LAST collation, all values after first
null value will be null,
- // don't need to check it too.
- if (skipCheck[0] && !checkExpired)
- return nullsFirst;
-
- boolean res = notNullRowFilter.apply(tree, io, pageAddr,
idx);
-
- if (res == nullsFirst)
- skipCheck[0] = true;
-
- return res;
- }
-
- @Override public IndexRow lastRow() {
- return (skipCheck[0] && !checkExpired)
- ? null
- : notNullRowFilter.lastRow();
- }
- };
- }
-
- return checkExpired ? IndexScan.createNotExpiredRowFilter() : null;
- }
-
- /** */
- private static @NotNull BPlusTree.TreeRowClosure<IndexRow, IndexRow>
transactionAwareCountRowFilter(
- BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter,
- TransactionChanges<CacheDataRow> txChanges
- ) {
- return new BPlusTree.TreeRowClosure<>() {
- @Override public boolean apply(
- BPlusTree<IndexRow, IndexRow> tree,
- BPlusIO<IndexRow> io,
- long pageAddr,
- int idx
- ) throws IgniteCheckedException {
- if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr,
idx))
- return false;
-
- if (txChanges.changedKeysEmpty())
- return true;
-
- IndexRow row = rowFilter == null ? null : rowFilter.lastRow();
-
- if (row == null)
- row = tree.getRow(io, pageAddr, idx);
-
- // Intentionally use of `remove` here.
- // We want to perform as few `key` as possible.
- // So we break some rules here to optimize work with the data
provided by the tree.
- return !txChanges.remove(row.cacheDataRow().key());
- }
- };
- }
-
- /** */
- private static long countTransactionRows(boolean notNull, InlineIndex
iidx, List<CacheDataRow> changedRows) {
- InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler();
-
- long cnt = 0;
-
- for (CacheDataRow txRow : changedRows) {
- if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE && notNull)
- continue;
-
- cnt++;
- }
-
- return cnt;
+ return new IndexCountScan<>(ectx, tbl.descriptor().cacheContext(),
locParts, iidx, collation, notNull);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 618a78920a2..903d4eddf5d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -88,9 +88,9 @@ public interface IgniteIndex {
* @param ectx Execution context.
* @param grp Colocation group.
* @param notNull Exclude null values.
- * @return Index records number for {@code group}.
+ * @return Iterable with one row and one col: records count by index for
{@code group}.
*/
- public long count(ExecutionContext<?> ectx, ColocationGroup grp, boolean
notNull);
+ public <Row> Iterable<Row> count(ExecutionContext<Row> ectx,
ColocationGroup grp, boolean notNull);
/**
* Takes only first or last not-null index value.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
index 38657fec1e6..1a8abd64aea 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.internal.processors.query.calcite.schema;
+import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
@@ -93,10 +94,10 @@ public class SystemViewIndexImpl implements IgniteIndex {
}
/** {@inheritDoc} */
- @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp,
boolean notNull) {
+ @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx,
ColocationGroup grp, boolean notNull) {
assert !notNull; // Collation is empty, cannot come here with
"notNull" flag.
- return tbl.descriptor().systemView().size();
+ return
Collections.singletonList(ectx.rowHandler().factory(long.class).create(tbl.descriptor().systemView().size()));
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 8451d68d277..81e6024b56b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -144,7 +144,7 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
}
/** */
- protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
+ protected QueryChecker assertQuery(Ignite ignite, String qry) {
return new QueryChecker(qry) {
@Override protected QueryEngine getEngine() {
return Commons.lookupComponent(((IgniteEx)ignite).context(),
QueryEngine.class);
@@ -301,7 +301,7 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public long count(ExecutionContext<?> ectx, ColocationGroup
grp, boolean notNull) {
+ @Override public <Row> Iterable<Row> count(ExecutionContext<Row> ectx,
ColocationGroup grp, boolean notNull) {
return delegate.count(ectx, grp, notNull);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
index e193a8b785e..f35e2c29a01 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java
@@ -113,10 +113,10 @@ public abstract class
AbstractBasicIntegrationTransactionalTest extends Abstract
}
/** {@inheritDoc} */
- @Override protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
+ @Override protected QueryChecker assertQuery(Ignite ignite, String qry) {
return new QueryChecker(qry, tx, sqlTxMode) {
@Override protected QueryEngine getEngine() {
- return Commons.lookupComponent(ignite.context(),
QueryEngine.class);
+ return Commons.lookupComponent(((IgniteEx)ignite).context(),
QueryEngine.class);
}
};
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
new file mode 100644
index 00000000000..242fe33b596
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionsReservationIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.processors.query.calcite.exec.IndexScan;
+import org.apache.ignite.internal.processors.query.calcite.exec.TableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
+import org.apache.ignite.internal.util.typedef.X;
+import org.junit.Test;
+
+/**
+ * Tests partition reservation/releasing for queries over unstable topology.
+ */
+public class PartitionsReservationIntegrationTest extends
AbstractBasicIntegrationTest {
+ /** */
+ private static final int PARTS = 16;
+
+ /** */
+ private static final int KEYS = PARTS * 100_000;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op. Don't start any grids.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(2);
+
+ client = startClientGrid();
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME)
+ .setIndexedTypes(Integer.class, Employer.class)
+ .setSqlSchema(QueryUtils.DFLT_SCHEMA)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testIndexCount() throws Exception {
+ checkPartitionsReservationRelease(
+ assertQuery("SELECT COUNT(_key) FROM Employer")
+
.matches(QueryChecker.containsSubPlan(IgniteIndexCount.class.getSimpleName()))
+ .ordered() // To avoid modification of QueryChecker.
+ .returns((long)KEYS));
+ }
+
+ /** */
+ @Test
+ public void testIndexScan() throws Exception {
+ checkPartitionsReservationRelease(
+ assertQuery("SELECT /*+ FORCE_INDEX */ * FROM Employer")
+
.matches(QueryChecker.containsSubPlan(IndexScan.class.getSimpleName()))
+ .resultSize(KEYS));
+ }
+
+ /** */
+ @Test
+ public void testTableScan() throws Exception {
+ checkPartitionsReservationRelease(
+ assertQuery("SELECT * FROM Employer")
+
.matches(QueryChecker.containsSubPlan(TableScan.class.getSimpleName()))
+ .resultSize(KEYS));
+ }
+
+ /** */
+ public void checkPartitionsReservationRelease(QueryChecker checker) throws
Exception {
+ try (IgniteDataStreamer<Object, Object> streamer =
client.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < KEYS; i++)
+ streamer.addData(i, new Employer("name" + i, (double)i));
+ }
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> qryFut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ while (!stop.get()) {
+ try {
+ checker.check();
+ }
+ catch (IgniteSQLException e) {
+ if (X.hasCause(e, ClusterTopologyException.class))
+ continue; // Expected when topology changed while
query starts.
+
+ throw e;
+ }
+ }
+ }
+ }, 10, "qry-thread");
+
+ // Trigger rebalance and partitions eviction.
+ startGrid(2);
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ doSleep(1_000L);
+
+ stop.set(true);
+
+ qryFut.get();
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
similarity index 86%
rename from
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
rename to
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
index b1e5897690e..120642ee2b1 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/UnstableTopologyTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UnstableTopologyIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.Collection;
import java.util.Collections;
@@ -38,23 +38,19 @@ import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.query.QueryEngine;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-/** Non stable topology tests. */
+/** Non-stable topology tests. */
@RunWith(Parameterized.class)
-public class UnstableTopologyTest extends GridCommonAbstractTest {
+public class UnstableTopologyIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
private static final String POI_CACHE_NAME = "POI_CACHE";
@@ -101,10 +97,18 @@ public class UnstableTopologyTest extends
GridCommonAbstractTest {
@Parameterized.Parameter(1)
public boolean idxSlowDown;
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op. We don't need to start anything.
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ if (idxSlowDown)
+ cfg.setIndexingSpi(new BlockingIndexingSpi());
+
cfg.setCacheConfiguration(new CacheConfiguration<>(POI_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setSqlSchema(POI_SCHEMA_NAME)
@@ -149,22 +153,19 @@ public class UnstableTopologyTest extends
GridCommonAbstractTest {
*/
@Test
public void testSelectCorrectnessOnUnstableTopology() throws Exception {
- ignitionStart(0, idxSlowDown);
- IgniteEx ig = ignitionStart(1, idxSlowDown);
+ startGrids(2);
- loadData(ig, 0, NUM_ENTITIES);
+ loadData(grid(1), 0, NUM_ENTITIES);
- ignitionStart(2, idxSlowDown);
+ startGrid(2);
if (awaitExchange)
awaitPartitionMapExchange(true, true, null);
else
awaitPartitionMapExchange();
- QueryEngine engine = Commons.lookupComponent(grid(1).context(),
QueryEngine.class);
-
- G.allGrids().forEach(g -> assertEquals(NUM_ENTITIES,
engine.query(null, POI_SCHEMA_NAME,
- "SELECT * FROM " + POI_TABLE_NAME).get(0).getAll().size()));
+ G.allGrids().forEach(g -> assertQuery(g, "SELECT * FROM " +
POI_SCHEMA_NAME + '.' + POI_TABLE_NAME)
+ .resultSize(NUM_ENTITIES).check());
}
/** */
@@ -184,16 +185,6 @@ public class UnstableTopologyTest extends
GridCommonAbstractTest {
}
}
- /** Start with custon indexing SPI. */
- private IgniteEx ignitionStart(int idx, boolean slow) throws Exception {
- IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(idx));
-
- if (slow)
- cfg.setIndexingSpi(new BlockingIndexingSpi());
-
- return startGrid(cfg);
- }
-
/**
* Simple blocking indexing SPI.
*/
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index a062d3d3169..9117c21a6ec 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -22,7 +22,6 @@ import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import
org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest;
import
org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import
org.apache.ignite.internal.processors.query.calcite.UnstableTopologyTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.CalciteBasicSecondaryIndexIntegrationTest;
@@ -53,6 +52,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuo
import
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest;
@@ -67,6 +67,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.SystemVie
import
org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.TimeoutIntegrationTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.UnstableTopologyIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
@@ -116,7 +117,8 @@ import org.junit.runners.Suite;
IndexScanlIntegrationTest.class,
IndexScanMultiNodeIntegrationTest.class,
SetOpIntegrationTest.class,
- UnstableTopologyTest.class,
+ UnstableTopologyIntegrationTest.class,
+ PartitionsReservationIntegrationTest.class,
JoinCommuteRulesTest.class,
ServerStatisticsIntegrationTest.class,
JoinIntegrationTest.class,