This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a9a1fec0124 IGNITE-26683 IndexQuery can't use an index until data
population completes (#12412)
a9a1fec0124 is described below
commit a9a1fec012454d60bc6e92c42e01256b6b3cfcc0
Author: Ashesha <[email protected]>
AuthorDate: Sun Nov 16 23:33:12 2025 +0700
IGNITE-26683 IndexQuery can't use an index until data population completes
(#12412)
---
.../internal/cache/query/index/IndexName.java | 17 ++
.../internal/cache/query/index/IndexProcessor.java | 101 ++++++++--
.../cache/query/index/IndexQueryProcessor.java | 6 +-
.../cache/query/IndexQueryBuildIndexTest.java | 200 ++++++++++++++++++++
.../ignite/cache/query/IndexQueryTestSuite.java | 2 +
.../ignite/cache/query/SqlQueryBuildIndexTest.java | 203 +++++++++++++++++++++
6 files changed, 515 insertions(+), 14 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexName.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexName.java
index 19229945ec6..564895a6568 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexName.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexName.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.cache.query.index;
+import java.util.Objects;
import org.jetbrains.annotations.Nullable;
/**
@@ -90,4 +91,20 @@ public class IndexName {
@Override public String toString() {
return fullName();
}
+
+ /** */
+ @Override public boolean equals(Object o) {
+ if (!(o instanceof IndexName))
+ return false;
+
+ IndexName name = (IndexName)o;
+
+ return Objects.equals(schemaName, name.schemaName) &&
Objects.equals(tableName, name.tableName)
+ && Objects.equals(cacheName, name.cacheName) &&
Objects.equals(idxName, name.idxName);
+ }
+
+ /** */
+ @Override public int hashCode() {
+ return Objects.hash(schemaName, tableName, cacheName, idxName);
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
index 5feb6478adf..af194941398 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -115,6 +117,9 @@ public class IndexProcessor extends GridProcessorAdapter {
/** Exclusive lock for DDL operations. */
private final ReentrantReadWriteLock ddlLock = new
ReentrantReadWriteLock();
+ /** Set of index names currently in initial population (null if none). */
+ private @Nullable Set<IndexName> fillingIdxs;
+
/**
* @param ctx Kernal context.
*/
@@ -211,15 +216,45 @@ public class IndexProcessor extends GridProcessorAdapter {
IndexDefinition definition,
SchemaIndexCacheVisitor cacheVisitor
) {
- Index idx = createIndex(cctx, factory, definition);
+ IndexFactory dynamicFactory = (gcctx, indexDefinition) -> {
+ Index idx = factory.createIndex(gcctx, indexDefinition);
+
+ assert ddlLock.isWriteLockedByCurrentThread();
+
+ if (fillingIdxs == null)
+ fillingIdxs = new HashSet<>();
+
+ fillingIdxs.add(indexDefinition.idxName());
+
+ return idx;
+ };
+
+ try {
+ Index idx = createIndex(cctx, dynamicFactory, definition);
- // Populate index with cache rows.
- cacheVisitor.visit(row -> {
- if (idx.canHandle(row))
- idx.onUpdate(null, row, false);
- });
+ // Populate index with cache rows.
+ cacheVisitor.visit(row -> {
+ if (idx.canHandle(row))
+ idx.onUpdate(null, row, false);
+ });
- return idx;
+ return idx;
+ }
+ finally {
+ ddlLock.writeLock().lock();
+
+ try {
+ if (fillingIdxs != null) {
+ fillingIdxs.remove(definition.idxName());
+
+ if (fillingIdxs.isEmpty())
+ fillingIdxs = null;
+ }
+ }
+ finally {
+ ddlLock.writeLock().unlock();
+ }
+ }
}
/**
@@ -272,8 +307,18 @@ public class IndexProcessor extends GridProcessorAdapter {
Index idx = idxs.remove(idxName.fullName());
- if (idx != null)
+ if (idx != null) {
idx.destroy(softDelete);
+
+ if (fillingIdxs != null) {
+ fillingIdxs.remove(idxName);
+
+ if (fillingIdxs.isEmpty())
+ fillingIdxs = null;
+ }
+
+ }
+
}
finally {
ddlLock.writeLock().unlock();
@@ -371,16 +416,33 @@ public class IndexProcessor extends GridProcessorAdapter {
* @return Collection of indexes for specified cache.
*/
public Collection<Index> indexes(String cacheName) {
+ return indexes(cacheName, false);
+ }
+
+ /**
+ * Returns collection of indexes for specified cache.
+ *
+ * @param cacheName Cache name.
+ * @param skipFilling If {@code true}, indexes that are currently being
initially populated are excluded
+ * from the result; if {@code false}, all known indexes for the cache
are returned.
+ * @return Collection of indexes for specified cache.
+ */
+
+ public Collection<Index> indexes(String cacheName, boolean skipFilling) {
ddlLock.readLock().lock();
try {
- Map<String, Index> idxs = cacheToIdx.get(cacheName);
+ Map<String, Index> idxMap = cacheToIdx.get(cacheName);
- if (idxs == null)
+ if (idxMap == null)
return Collections.emptyList();
- return idxs.values();
+ List<Index> idxs = new ArrayList<>(idxMap.values());
+ if (skipFilling && fillingIdxs != null)
+ idxs.removeIf(idx ->
fillingIdxs.contains(idx.indexDefinition().idxName()));
+
+ return idxs;
}
finally {
ddlLock.readLock().unlock();
@@ -394,9 +456,26 @@ public class IndexProcessor extends GridProcessorAdapter {
* @return Index for specified index name or {@code null} if not found.
*/
public @Nullable Index index(IndexName idxName) {
+ return index(idxName, false);
+ }
+
+ /**
+ * Returns index for specified name.
+ *
+ * @param idxName Index name.
+ * @param skipFilling If {@code true}, returns {@code null} when the index
is currently being initially
+ * populated and therefore should be skipped; if {@code false},
returns the index if present.
+ * @return Index for specified index name or {@code null} if not found (or
skipped due to population when
+ * {@code skipFilling} is {@code true}).
+ */
+
+ public @Nullable Index index(IndexName idxName, boolean skipFilling) {
ddlLock.readLock().lock();
try {
+ if (skipFilling && fillingIdxs != null &&
fillingIdxs.contains(idxName))
+ return null;
+
Map<String, Index> idxs = cacheToIdx.get(idxName.cacheName());
if (idxs == null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index 5b2b1637577..6ba30762379 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -217,14 +217,14 @@ public class IndexQueryProcessor {
* @throws IgniteCheckedException If index not found or specified index
doesn't match query criteria.
*/
private SortedSegmentedIndex indexByName(IndexName idxName, final
Map<String, String> criteriaFlds) throws IgniteCheckedException {
- SortedSegmentedIndex idx = assertSortedIndex(idxProc.index(idxName));
+ SortedSegmentedIndex idx = assertSortedIndex(idxProc.index(idxName,
true));
if (idx == null &&
!QueryUtils.PRIMARY_KEY_INDEX.equals(idxName.idxName())) {
String normIdxName =
QueryUtils.normalizeObjectName(idxName.idxName(), false);
idxName = new IndexName(idxName.cacheName(), idxName.schemaName(),
idxName.tableName(), normIdxName);
- idx = assertSortedIndex(idxProc.index(idxName));
+ idx = assertSortedIndex(idxProc.index(idxName, true));
}
if (idx == null)
@@ -245,7 +245,7 @@ public class IndexQueryProcessor {
final Map<String, String> criteriaFlds,
String tableName
) throws IgniteCheckedException {
- Collection<Index> idxs = idxProc.indexes(cctx.name());
+ Collection<Index> idxs = idxProc.indexes(cctx.name(), true);
for (Index idx: idxs) {
SortedSegmentedIndex sortedIdx = assertSortedIndex(idx);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java
new file mode 100644
index 00000000000..98dc467316e
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.IndexName;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between;
+
+/** */
+@RunWith(Parameterized.class)
+public class IndexQueryBuildIndexTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE = "TEST_CACHE";
+
+ /** */
+ private static final String TBL = "Person";
+
+ /** */
+ private static final String IDX = "TEST_IDX";
+
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ @Parameterized.Parameter
+ public String qryNode;
+
+ /** */
+ @Parameterized.Parameters(name = "qryNode={0}")
+ public static Object[] parameters() {
+ return new Object[] { "CRD", "CLN" };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ QueryEntity qe = new QueryEntity(Long.class.getName(),
Integer.class.getName())
+ .setTableName(TBL)
+ .setKeyFieldName("id")
+ .setValueFieldName("fld")
+ .setFields(new LinkedHashMap<>(
+ F.asMap("id", Long.class.getName(), "fld",
Integer.class.getName()))
+ );
+
+ CacheConfiguration<Long, Integer> ccfg = new CacheConfiguration<Long,
Integer>()
+ .setName(CACHE)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setQueryEntities(F.asList(qe))
+ .setBackups(1);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(false)));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setBuildIndexThreadPoolSize(1);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testConcurrentCreateIndex() throws Exception {
+ IgniteEx crd = startGrids(3);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Long, Integer> cache = cache();
+
+ insertData();
+
+ CountDownLatch idxBuildGate = new CountDownLatch(1);
+
+ CountDownLatch workerParked = new CountDownLatch(1);
+
+ Future<?> blocker =
crd.context().pools().buildIndexExecutorService().submit(() -> {
+ workerParked.countDown();
+
+ try {
+ idxBuildGate.await(30, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ try {
+ assertTrue("blocker must be parked", workerParked.await(10,
TimeUnit.SECONDS));
+
+ multithreadedAsync(() -> {
+ SqlFieldsQuery idxCreate = new SqlFieldsQuery("create index "
+ IDX + " on " + TBL + "(fld)");
+
+ cache.query(idxCreate).getAll();
+ }, 1);
+
+ IndexProcessor ip = crd.context().indexProcessor();
+
+ IndexName name = new IndexName(cache.getName(), CACHE,
TBL.toUpperCase(), IDX);
+
+ boolean seenBuilding = GridTestUtils.waitForCondition(() ->
ip.index(name) != null, 10_000);
+
+ assertTrue("Index must exist", seenBuilding);
+
+ IndexQuery<Long, Integer> qry = new IndexQuery<Long,
Integer>(Integer.class, IDX)
+ .setCriteria(between("fld", 0, CNT - 1));
+
+ GridTestUtils.assertThrows(
+ null,
+ () -> {
+ cache.query(qry).getAll();
+ },
+ IgniteException.class,
+ "No index found for"
+ );
+
+ idxBuildGate.countDown();
+
+ crd.cache(CACHE).indexReadyFuture().get(30_000);
+
+ boolean done = GridTestUtils.waitForCondition(() -> ip.index(name,
true) != null, 20_000);
+
+ assertTrue("Build must finish", done);
+
+ assertEquals(CNT, cache.query(qry).getAll().size());
+ }
+ finally {
+ idxBuildGate.countDown();
+
+ blocker.cancel(true);
+ }
+ }
+
+ /** */
+ private void insertData() {
+ try (IgniteDataStreamer<Long, Integer> streamer =
grid(0).dataStreamer(CACHE)) {
+ for (int i = 0; i < CNT; i++)
+ streamer.addData((long)i, i);
+ }
+ }
+
+ /** */
+ private IgniteCache<Long, Integer> cache() throws Exception {
+ Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid();
+
+ return n.cache(CACHE);
+ }
+
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index ed392ae42c3..c856f9e7ccf 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -37,6 +37,8 @@ import org.junit.runners.Suite;
IndexQueryRangeTest.class,
IndexQueryPartitionTest.class,
IndexQueryRebuildIndexTest.class,
+ IndexQueryBuildIndexTest.class,
+ SqlQueryBuildIndexTest.class,
IndexQueryCacheKeyValueFieldsTest.class,
IndexQueryCacheKeyValueEscapedFieldsTest.class,
IndexQueryCacheKeyValueTransformedFieldsTest.class,
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/SqlQueryBuildIndexTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/SqlQueryBuildIndexTest.java
new file mode 100644
index 00000000000..46194dc8063
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/SqlQueryBuildIndexTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.IndexName;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class SqlQueryBuildIndexTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE = "TEST_CACHE";
+
+ /** */
+ private static final String TBL = "Person";
+
+ /** */
+ private static final String IDX = "TEST_IDX";
+
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ @Parameterized.Parameter
+ public String qryNode;
+
+ /** */
+ @Parameterized.Parameters(name = "qryNode={0}")
+ public static Object[] parameters() {
+ return new Object[] {"CRD", "CLN"};
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ QueryEntity qe = new QueryEntity(Long.class.getName(),
Integer.class.getName())
+ .setTableName(TBL)
+ .setKeyFieldName("id")
+ .setValueFieldName("fld")
+ .setFields(new LinkedHashMap<>(F.asMap("id", Long.class.getName(),
"fld", Integer.class.getName())));
+
+ CacheConfiguration<Long, Integer> ccfg = new CacheConfiguration<Long,
Integer>()
+ .setName(CACHE)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setQueryEntities(F.asList(qe))
+ .setBackups(1);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(false))
+ );
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setBuildIndexThreadPoolSize(1);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testConcurrentCreateIndex() throws Exception {
+ IgniteEx crd = startGrids(3);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Long, Integer> cache = cache();
+
+ insertData();
+
+ CountDownLatch idxBuildGate = new CountDownLatch(1);
+
+ CountDownLatch workerParked = new CountDownLatch(1);
+
+ Future<?> blocker =
crd.context().pools().buildIndexExecutorService().submit(() -> {
+ workerParked.countDown();
+
+ try {
+ idxBuildGate.await(30, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ try {
+ assertTrue("blocker must be parked", workerParked.await(10,
TimeUnit.SECONDS));
+
+ multithreadedAsync(() -> {
+ SqlFieldsQuery ddl = new SqlFieldsQuery("CREATE INDEX " + IDX
+ " ON " + TBL + "(fld)");
+
+ cache.query(ddl).getAll();
+ }, 1);
+
+ IndexProcessor ip = crd.context().indexProcessor();
+
+ IndexName name = new IndexName(cache.getName(), CACHE,
TBL.toUpperCase(), IDX);
+
+ boolean seenBuilding = GridTestUtils.waitForCondition(() ->
ip.index(name) != null, 10_000);
+
+ assertTrue("Index must exist", seenBuilding);
+
+ String sql = "SELECT id FROM " + TBL + " USE INDEX(" + IDX + ")
WHERE fld BETWEEN ? AND ? ORDER BY id";
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql).setArgs(0, CNT - 1);
+
+ GridTestUtils.assertThrows(
+ null,
+ () -> {
+ cache.query(qry).getAll();
+ },
+ IgniteException.class,
+ "Failed to parse query. Index \"" + IDX + "\" not found; SQL
statement"
+ );
+
+ idxBuildGate.countDown();
+
+ crd.cache(CACHE).indexReadyFuture().get(30_000);
+
+ boolean done = GridTestUtils.waitForCondition(() -> ip.index(name,
true) != null, 20_000);
+
+ assertTrue("Build must finish", done);
+
+ assertEquals(CNT, cache.query(qry).getAll().size());
+ }
+ finally {
+ idxBuildGate.countDown();
+
+ blocker.cancel(true);
+ }
+ }
+
+ /** */
+ private void insertData() {
+ try (IgniteDataStreamer<Long, Integer> streamer =
grid(0).dataStreamer(CACHE)) {
+ for (int i = 0; i < CNT; i++)
+ streamer.addData((long)i, i);
+ }
+ }
+
+ /** */
+ private IgniteCache<Long, Integer> cache() throws Exception {
+ Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid();
+ return n.cache(CACHE);
+ }
+
+}