This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 34a850b1827 IGNITE-16452 Fix IndexQuery can query rebuilding index.
(#12378)
34a850b1827 is described below
commit 34a850b18277acb981001da3c6ca3d3733eea466
Author: Ashesha <[email protected]>
AuthorDate: Fri Oct 10 16:19:10 2025 +0700
IGNITE-16452 Fix IndexQuery can query rebuilding index. (#12378)
Co-authored-by: Maksim Timonin <[email protected]>
---
.../internal/cache/query/index/AbstractIndex.java | 12 +-
.../ignite/internal/cache/query/index/Index.java | 10 +
.../internal/cache/query/index/IndexProcessor.java | 16 +-
.../cache/query/index/IndexQueryProcessor.java | 5 +
.../cache/query/IndexQueryRebuildIndexTest.java | 202 +++++++++++++++++++++
.../ignite/cache/query/IndexQueryTestSuite.java | 1 +
6 files changed, 230 insertions(+), 16 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/AbstractIndex.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/AbstractIndex.java
index 59ba127c08a..6c29b328cc3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/AbstractIndex.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/AbstractIndex.java
@@ -26,17 +26,13 @@ public abstract class AbstractIndex implements Index {
/** Whether index is rebuilding now. */
private final AtomicBoolean rebuildInProgress = new AtomicBoolean(false);
- /**
- * @param val Mark or unmark index to rebuild.
- */
- public void markIndexRebuild(boolean val) {
+ /** {@inheritDoc} */
+ @Override public void markIndexRebuild(boolean val) {
rebuildInProgress.compareAndSet(!val, val);
}
- /**
- * @return Whether index is rebuilding now.
- */
- public boolean rebuildInProgress() {
+ /** {@inheritDoc} */
+ @Override public boolean rebuildInProgress() {
return rebuildInProgress.get();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/Index.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/Index.java
index 45984b40c25..e0c1d0c1295 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/Index.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/Index.java
@@ -71,4 +71,14 @@ public interface Index {
* @return Index definition.
*/
public IndexDefinition indexDefinition();
+
+ /**
+ * @param val Mark or unmark index to (re)build.
+ */
+ public void markIndexRebuild(boolean val);
+
+ /**
+ * @return Whether index is (re)building now.
+ */
+ public boolean rebuildInProgress();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
index 3bdc9187270..35ae189f8e7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
@@ -212,9 +212,12 @@ public class IndexProcessor extends GridProcessorAdapter {
* @param definition Description of an index to create.
* @param cacheVisitor Enable to cancel dynamic index populating.
*/
- public Index createIndexDynamically(GridCacheContext cctx, IndexFactory
factory, IndexDefinition definition,
- SchemaIndexCacheVisitor cacheVisitor) {
-
+ public Index createIndexDynamically(
+ GridCacheContext<?, ?> cctx,
+ IndexFactory factory,
+ IndexDefinition definition,
+ SchemaIndexCacheVisitor cacheVisitor
+ ) {
Index idx = createIndex(cctx, factory, definition);
// Populate index with cache rows.
@@ -345,11 +348,8 @@ public class IndexProcessor extends GridProcessorAdapter {
Collection<Index> idxs = cacheToIdx.get(cctx.name()).values();
- for (Index idx: idxs) {
- if (idx instanceof AbstractIndex)
- ((AbstractIndex)idx).markIndexRebuild(val);
- }
-
+ for (Index idx: idxs)
+ idx.markIndexRebuild(val);
}
finally {
ddlLock.readLock().unlock();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index bef88ec2e41..27dd4176c22 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -84,6 +84,11 @@ public class IndexQueryProcessor {
) throws IgniteCheckedException {
InlineIndexImpl idx = (InlineIndexImpl)findSortedIndex(cctx,
idxQryDesc);
+ if (idx.rebuildInProgress()) {
+ throw new IgniteCheckedException(String.format("Failed to run
IndexQuery due to index rebuild is in progress"
+ + " [index=%s, query=%s]",
idx.indexDefinition().idxName(), idxQryDesc));
+ }
+
IndexMultipleRangeQuery qry = prepareQuery(idx, idxQryDesc);
GridCursor<IndexRow> cursor = queryMultipleRanges(idx, cacheFilter,
qry);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryRebuildIndexTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryRebuildIndexTest.java
new file mode 100644
index 00000000000..ed812a8eabc
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryRebuildIndexTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
+import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between;
+
+/** */
+@RunWith(Parameterized.class)
+public class IndexQueryRebuildIndexTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE = "TEST_CACHE";
+
+ /** */
+ private static final String IDX = "TEST_IDX";
+
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ private boolean persistenceEnabled;
+
+ /** */
+ @Parameterized.Parameter
+ public String qryNode;
+
+ /** */
+ @Parameterized.Parameters(name = "qryNode={0}")
+ public static Object[] parameters() {
+ return new Object[] { "CRD", "CLN" };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ QueryEntity qe = new QueryEntity(Long.class.getName(),
Integer.class.getName())
+ .setTableName("Person")
+ .setKeyFieldName("id")
+ .setValueFieldName("fld")
+ .setFields(new LinkedHashMap<>(
+ F.asMap("id", Long.class.getName(), "fld",
Integer.class.getName()))
+ );
+
+ CacheConfiguration<Long, Integer> ccfg = new CacheConfiguration<Long,
Integer>()
+ .setName(CACHE)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setQueryEntities(F.asList(qe))
+ .setBackups(1);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new
DataRegionConfiguration().setPersistenceEnabled(persistenceEnabled)));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testConcurrentRebuildIndex() throws Exception {
+ persistenceEnabled = true;
+
+ IndexProcessor.idxRebuildCls = BlockingRebuildIndexes.class;
+
+ Ignite crd = startGrids(3);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Long, Integer> cache = cache();
+
+ cache.query(new SqlFieldsQuery("create index " + IDX + " on
Person(fld)")).getAll();
+
+ insertData();
+
+ BlockingRebuildIndexes rebuild =
(BlockingRebuildIndexes)grid(0).context().indexProcessor().idxRebuild();
+
+ rebuild.setUp();
+
+ multithreadedAsync(() -> {
+ forceRebuildIndexes(grid(0), grid(0).cachex(CACHE).context());
+ }, 1);
+
+ rebuild.idxRebuildStartLatch.await();
+
+ IndexQuery<Long, Integer> qry = new IndexQuery<Long,
Integer>(Integer.class)
+ .setCriteria(between("fld", 0, CNT));
+
+ GridTestUtils.assertThrows(null,
+ () -> {
+ cache.query(qry).getAll();
+ }, IgniteException.class, "Failed to run IndexQuery due to index
rebuild is in progress");
+
+ rebuild.blockIdxRebuildLatch.countDown();
+
+ crd.cache(CACHE).indexReadyFuture().get();
+
+ assertEquals(CNT, cache.query(qry).getAll().size());
+ }
+
+ /** */
+ private void insertData() {
+ try (IgniteDataStreamer<Long, Integer> streamer =
grid(0).dataStreamer(CACHE)) {
+ for (int i = 0; i < CNT; i++)
+ streamer.addData((long)i, i);
+ }
+ }
+
+ /** */
+ private IgniteCache<Long, Integer> cache() throws Exception {
+ Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid();
+
+ return n.cache(CACHE);
+ }
+
+ /** Blocks filling dynamically created index with cache data. */
+ public static class BlockingRebuildIndexes extends IndexesRebuildTask {
+ /** */
+ public volatile CountDownLatch blockIdxRebuildLatch = new
CountDownLatch(0);
+
+ /** */
+ public volatile CountDownLatch idxRebuildStartLatch = new
CountDownLatch(0);
+
+ /** {@inheritDoc} */
+ @Override protected void startRebuild(
+ GridCacheContext cctx,
+ GridFutureAdapter<Void> fut,
+ SchemaIndexCacheVisitorClosure clo,
+ IndexRebuildCancelToken cancelTok
+ ) {
+ try {
+ idxRebuildStartLatch.countDown();
+
+ blockIdxRebuildLatch.await();
+
+ super.startRebuild(cctx, fut, clo, cancelTok);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ public void setUp() {
+ blockIdxRebuildLatch = new CountDownLatch(1);
+ idxRebuildStartLatch = new CountDownLatch(1);
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index 906b2e2b673..ed392ae42c3 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -36,6 +36,7 @@ import org.junit.runners.Suite;
IndexQuerySqlIndexTest.class,
IndexQueryRangeTest.class,
IndexQueryPartitionTest.class,
+ IndexQueryRebuildIndexTest.class,
IndexQueryCacheKeyValueFieldsTest.class,
IndexQueryCacheKeyValueEscapedFieldsTest.class,
IndexQueryCacheKeyValueTransformedFieldsTest.class,