This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 287eb66 IGNITE-12794 Fix "Unexpected row key" assertion during scan
query - Fixes #7541.
287eb66 is described below
commit 287eb66ce4c838c054768cd90047d66309603ffe
Author: Denis Mekhanikov <[email protected]>
AuthorDate: Tue Oct 27 10:24:41 2020 +0300
IGNITE-12794 Fix "Unexpected row key" assertion during scan query - Fixes
#7541.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../processors/cache/GridCacheMapEntry.java | 3 +-
.../ScanQueryConcurrentUpdatesAbstractTest.java | 210 +++++++++++++++++++++
.../query/ScanQueryConcurrentUpdatesTest.java | 54 ++++++
.../query/ScanQueryConcurrentSqlUpdatesTest.java | 87 +++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 4 +
5 files changed, 357 insertions(+), 1 deletion(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index de4edb0..b38b894 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -564,7 +564,8 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
checkObsolete();
if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
- assert row == null || row.key() == key : "Unexpected row key";
+ assert row == null || Objects.equals(row.key(), key) :
+ "Unexpected row key [row.key=" + row.key() + ",
cacheEntry.key=" + key + "]";
CacheDataRow read = row == null ? cctx.offheap().read(this) :
row;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
new file mode 100644
index 0000000..502a628
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base for tests that check the behaviour of scan queries run on a data set
that is modified concurrently.
+ * Actual tests should implement a way of cache creation, modification and
destruction.
+ */
+public abstract class ScanQueryConcurrentUpdatesAbstractTest extends
GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Creates a cache with given parameters.
+ *
+ * @param cacheName Name of the cache.
+ * @param cacheMode Cache mode.
+ * @param expiration {@link Duration} for {@link
javax.cache.expiry.ExpiryPolicy}. If {@code null}, then
+ * {@link javax.cache.expiry.ExpiryPolicy} won't be configured.
+ *
+ * @return Instance of the created cache.
+ */
+ protected abstract IgniteCache<Integer, Integer> createCache(String
cacheName, CacheMode cacheMode,
+ Duration expiration);
+
+ /**
+ * Performs modification of a provided cache. Records with keys in range
{@code 0..(recordsNum - 1)} are updated.
+ *
+ * @param cache Cache to update.
+ * @param recordsNum Number of records to update.
+ */
+ protected abstract void updateCache(IgniteCache<Integer, Integer> cache,
int recordsNum);
+
+ /**
+ * Destroys the provided cache.
+ *
+ * @param cache Cache to destroy.
+ */
+ protected abstract void destroyCache(IgniteCache<Integer, Integer> cache);
+
+ /**
+ * Tests behaviour of scan queries with concurrent modification.
+ *
+ * @param cache Cache to test.
+ * @param recordsNum Number of records to load to the cache.
+ */
+ private void testStableDataset(IgniteCache<Integer, Integer> cache, int
recordsNum) {
+ int iterations = 1000;
+
+ AtomicBoolean finished = new AtomicBoolean();
+
+ try {
+ updateCache(cache, recordsNum);
+ GridTestUtils.runAsync(() -> {
+ while (!finished.get())
+ updateCache(cache, recordsNum);
+ });
+
+ for (int i = 0; i < iterations; i++) {
+ List<Cache.Entry<Integer, Integer>> res = cache.query(new
ScanQuery<Integer, Integer>()).getAll();
+
+ assertEquals("Unexpected query result size.", recordsNum,
res.size());
+
+ for (Cache.Entry<Integer, Integer> e : res)
+ assertEquals(e.getKey(), e.getValue());
+ }
+ }
+ finally {
+ finished.set(true);
+ destroyCache(cache);
+ }
+ }
+
+ /**
+ * Tests behaviour of scan queries with entries expired and modified
concurrently.
+ *
+ * @param cache Cache to test.
+ */
+ private void testExpiringDataset(IgniteCache<Integer, Integer> cache) {
+ int iterations = 100;
+ int recordsNum = 100;
+
+ try {
+ for (int i = 0; i < iterations; i++) {
+ updateCache(cache, recordsNum);
+
+ long updateTime = U.currentTimeMillis();
+
+ List<Cache.Entry<Integer, Integer>> res = cache.query(new
ScanQuery<Integer, Integer>()).getAll();
+
+ assertTrue("Query result set is too big: " + res.size(),
res.size() <= recordsNum);
+
+ for (Cache.Entry<Integer, Integer> e : res)
+ assertEquals(e.getKey(), e.getValue());
+
+ while (U.currentTimeMillis() == updateTime)
+ doSleep(10L);
+ }
+ }
+ finally {
+ destroyCache(cache);
+ }
+ }
+
+ /** */
+ @Test
+ public void testReplicatedOneRecordLongExpiry() {
+ testStableDataset(createCache("replicated_long_expiry",
+ CacheMode.REPLICATED, Duration.ONE_HOUR), 1);
+ }
+
+ /** */
+ @Test
+ public void testReplicatedManyRecordsLongExpiry() {
+ testStableDataset(createCache("replicated_long_expiry",
+ CacheMode.REPLICATED, Duration.ONE_HOUR), 1000);
+ }
+
+ /** */
+ @Test
+ public void testReplicatedOneRecordNoExpiry() {
+ testStableDataset(createCache("replicated_no_expiry",
+ CacheMode.REPLICATED, null), 1);
+ }
+
+ /** */
+ @Test
+ public void testReplicatedManyRecordsNoExpiry() {
+ testStableDataset(createCache("replicated_no_expiry",
+ CacheMode.REPLICATED, null), 1000);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedOneRecordLongExpiry() {
+ testStableDataset(createCache("partitioned_long_expiry",
+ CacheMode.PARTITIONED, Duration.ONE_HOUR), 1);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedManyRecordsLongExpiry() {
+ testStableDataset(createCache("partitioned_long_expiry",
+ CacheMode.PARTITIONED, Duration.ONE_HOUR), 1000);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedOneRecordNoExpiry() {
+ testStableDataset(createCache("partitioned_no_expiry",
+ CacheMode.PARTITIONED, null), 1);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedManyRecordsNoExpiry() {
+ testStableDataset(createCache("partitioned_no_expiry",
+ CacheMode.PARTITIONED, null), 1000);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedShortExpiry() {
+ testExpiringDataset(createCache("partitioned_short_expiry",
+ CacheMode.PARTITIONED, new Duration(TimeUnit.MILLISECONDS, 1)));
+ }
+
+ /** */
+ @Test
+ public void testReplicatedShortExpiry() {
+ testExpiringDataset(createCache("partitioned_short_expiry",
+ CacheMode.REPLICATED, new Duration(TimeUnit.MILLISECONDS, 1)));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
new file mode 100644
index 0000000..17dd603
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+/**
+ * {@link ScanQueryConcurrentUpdatesAbstractTest} with caches created, updates
and destroyed using Java API.
+ */
+public class ScanQueryConcurrentUpdatesTest extends
ScanQueryConcurrentUpdatesAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Integer> createCache(String
cacheName, CacheMode cacheMode,
+ Duration
expiration) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new
CacheConfiguration<>(cacheName);
+ cacheCfg.setCacheMode(cacheMode);
+ if (expiration != null) {
+
cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(expiration));
+ cacheCfg.setEagerTtl(true);
+ }
+
+ return grid(0).createCache(cacheCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateCache(IgniteCache<Integer, Integer> cache,
int recordsNum) {
+ for (int i = 0; i < recordsNum; i++)
+ cache.put(i, i);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void destroyCache(IgniteCache<Integer, Integer> cache)
{
+ cache.destroy();
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
new file mode 100644
index 0000000..bb67535
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+/**
+ * {@link ScanQueryConcurrentUpdatesAbstractTest} with caches created, updates
and destroyed using SQL DDL queries.
+ */
+public class ScanQueryConcurrentSqlUpdatesTest extends
ScanQueryConcurrentUpdatesAbstractTest {
+ /**
+ * A name for a cache that will be used to execute DDL queries.
+ */
+ private static final String DUMMY_CACHE_NAME = "dummy";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Integer> createCache(String
cacheName, CacheMode cacheMode,
+ Duration
expiration) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new
CacheConfiguration<>(cacheName);
+ cacheCfg.setCacheMode(cacheMode);
+ if (expiration != null) {
+
cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(expiration));
+ cacheCfg.setEagerTtl(true);
+ }
+
+ IgniteEx ignite = grid(0);
+ ignite.addCacheConfiguration(cacheCfg);
+
+ ignite.getOrCreateCache(DUMMY_CACHE_NAME).query(new
SqlFieldsQuery("CREATE TABLE " + cacheName + " " +
+ "(key int primary key, val int) " +
+ "WITH \"template=" + cacheName + ",wrap_value=false\""));
+
+ return ignite.cache("SQL_PUBLIC_" + cacheName.toUpperCase());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateCache(IgniteCache<Integer, Integer> cache,
int recordsNum) {
+ String tblName = tableName(cache);
+
+ for (int i = 0; i < recordsNum; i++) {
+ cache.query(new SqlFieldsQuery(
+ "INSERT INTO " + tblName + " (key, val) " +
+ "VALUES (" + i + ", " + i + ")"));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void destroyCache(IgniteCache<Integer, Integer> cache)
{
+ grid(0).cache(DUMMY_CACHE_NAME).query(new SqlFieldsQuery("DROP TABLE "
+ tableName(cache)));
+ }
+
+ /**
+ * @param cache Cache to determine a table name for.
+ * @return Name of the table corresponding to the provided cache.
+ */
+ @SuppressWarnings("unchecked")
+ private String tableName(IgniteCache<Integer, Integer> cache) {
+ CacheConfiguration<Integer, Integer> cacheCfg =
+ (CacheConfiguration<Integer, Integer>)
cache.getConfiguration(CacheConfiguration.class);
+ QueryEntity qe = cacheCfg.getQueryEntities().iterator().next();
+
+ return qe.getTableName();
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 09a2abf..457d2cd 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -183,6 +183,8 @@ import
org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
import
org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
import
org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
import
org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryWithH2IndexingSelfTest;
+import
org.apache.ignite.internal.processors.cache.query.ScanQueryConcurrentSqlUpdatesTest;
+import
org.apache.ignite.internal.processors.cache.query.ScanQueryConcurrentUpdatesTest;
import
org.apache.ignite.internal.processors.cache.transaction.DmlInsideTransactionTest;
import
org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
import
org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest;
@@ -536,6 +538,8 @@ import org.junit.runners.Suite;
IgniteCheckClusterStateBeforeExecuteQueryTest.class,
OptimizedMarshallerIndexNameTest.class,
SqlSystemViewsSelfTest.class,
+ ScanQueryConcurrentUpdatesTest.class,
+ ScanQueryConcurrentSqlUpdatesTest.class,
GridIndexRebuildSelfTest.class,
GridIndexRebuildTest.class,