This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 287eb66  IGNITE-12794 Fix "Unexpected row key" assertion during scan 
query - Fixes #7541.
287eb66 is described below

commit 287eb66ce4c838c054768cd90047d66309603ffe
Author: Denis Mekhanikov <[email protected]>
AuthorDate: Tue Oct 27 10:24:41 2020 +0300

    IGNITE-12794 Fix "Unexpected row key" assertion during scan query - Fixes 
#7541.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../processors/cache/GridCacheMapEntry.java        |   3 +-
 .../ScanQueryConcurrentUpdatesAbstractTest.java    | 210 +++++++++++++++++++++
 .../query/ScanQueryConcurrentUpdatesTest.java      |  54 ++++++
 .../query/ScanQueryConcurrentSqlUpdatesTest.java   |  87 +++++++++
 .../IgniteBinaryCacheQueryTestSuite.java           |   4 +
 5 files changed, 357 insertions(+), 1 deletion(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index de4edb0..b38b894 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -564,7 +564,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             checkObsolete();
 
             if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
-                assert row == null || row.key() == key : "Unexpected row key";
+                assert row == null || Objects.equals(row.key(), key) :
+                        "Unexpected row key [row.key=" + row.key() + ", 
cacheEntry.key=" + key + "]";
 
                 CacheDataRow read = row == null ? cctx.offheap().read(this) : 
row;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
new file mode 100644
index 0000000..502a628
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesAbstractTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base for tests that check the behaviour of scan queries run on a data set 
that is modified concurrently.
+ * Actual tests should implement a way of cache creation, modification and 
destruction.
+ */
+public abstract class ScanQueryConcurrentUpdatesAbstractTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Creates a cache with given parameters.
+     *
+     * @param cacheName Name of the cache.
+     * @param cacheMode Cache mode.
+     * @param expiration {@link Duration} for {@link 
javax.cache.expiry.ExpiryPolicy}. If {@code null}, then
+     * {@link javax.cache.expiry.ExpiryPolicy} won't be configured.
+     *
+     * @return Instance of the created cache.
+     */
+    protected abstract IgniteCache<Integer, Integer> createCache(String 
cacheName, CacheMode cacheMode,
+        Duration expiration);
+
+    /**
+     * Performs modification of a provided cache. Records with keys in range 
{@code 0..(recordsNum - 1)} are updated.
+     *
+     * @param cache Cache to update.
+     * @param recordsNum Number of records to update.
+     */
+    protected abstract void updateCache(IgniteCache<Integer, Integer> cache, 
int recordsNum);
+
+    /**
+     * Destroys the provided cache.
+     *
+     * @param cache Cache to destroy.
+     */
+    protected abstract void destroyCache(IgniteCache<Integer, Integer> cache);
+
+    /**
+     * Tests behaviour of scan queries with concurrent modification.
+     *
+     * @param cache Cache to test.
+     * @param recordsNum Number of records to load to the cache.
+     */
+    private void testStableDataset(IgniteCache<Integer, Integer> cache, int 
recordsNum) {
+        int iterations = 1000;
+
+        AtomicBoolean finished = new AtomicBoolean();
+
+        try {
+            updateCache(cache, recordsNum);
+            GridTestUtils.runAsync(() -> {
+                while (!finished.get())
+                    updateCache(cache, recordsNum);
+            });
+
+            for (int i = 0; i < iterations; i++) {
+                List<Cache.Entry<Integer, Integer>> res = cache.query(new 
ScanQuery<Integer, Integer>()).getAll();
+
+                assertEquals("Unexpected query result size.", recordsNum, 
res.size());
+
+                for (Cache.Entry<Integer, Integer> e : res)
+                    assertEquals(e.getKey(), e.getValue());
+            }
+        }
+        finally {
+            finished.set(true);
+            destroyCache(cache);
+        }
+    }
+
+    /**
+     * Tests behaviour of scan queries with entries expired and modified 
concurrently.
+     *
+     * @param cache Cache to test.
+     */
+    private void testExpiringDataset(IgniteCache<Integer, Integer> cache) {
+        int iterations = 100;
+        int recordsNum = 100;
+
+        try {
+            for (int i = 0; i < iterations; i++) {
+                updateCache(cache, recordsNum);
+
+                long updateTime = U.currentTimeMillis();
+
+                List<Cache.Entry<Integer, Integer>> res = cache.query(new 
ScanQuery<Integer, Integer>()).getAll();
+
+                assertTrue("Query result set is too big: " + res.size(), 
res.size() <= recordsNum);
+
+                for (Cache.Entry<Integer, Integer> e : res)
+                    assertEquals(e.getKey(), e.getValue());
+
+                while (U.currentTimeMillis() == updateTime)
+                    doSleep(10L);
+            }
+        }
+        finally {
+            destroyCache(cache);
+        }
+    }
+
+    /** */
+    @Test
+    public void testReplicatedOneRecordLongExpiry() {
+        testStableDataset(createCache("replicated_long_expiry",
+            CacheMode.REPLICATED, Duration.ONE_HOUR), 1);
+    }
+
+    /** */
+    @Test
+    public void testReplicatedManyRecordsLongExpiry() {
+        testStableDataset(createCache("replicated_long_expiry",
+            CacheMode.REPLICATED, Duration.ONE_HOUR), 1000);
+    }
+
+    /** */
+    @Test
+    public void testReplicatedOneRecordNoExpiry() {
+        testStableDataset(createCache("replicated_no_expiry",
+            CacheMode.REPLICATED, null), 1);
+    }
+
+    /** */
+    @Test
+    public void testReplicatedManyRecordsNoExpiry() {
+        testStableDataset(createCache("replicated_no_expiry",
+            CacheMode.REPLICATED, null), 1000);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedOneRecordLongExpiry() {
+        testStableDataset(createCache("partitioned_long_expiry",
+            CacheMode.PARTITIONED, Duration.ONE_HOUR), 1);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedManyRecordsLongExpiry() {
+        testStableDataset(createCache("partitioned_long_expiry",
+            CacheMode.PARTITIONED, Duration.ONE_HOUR), 1000);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedOneRecordNoExpiry() {
+        testStableDataset(createCache("partitioned_no_expiry",
+            CacheMode.PARTITIONED, null), 1);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedManyRecordsNoExpiry() {
+        testStableDataset(createCache("partitioned_no_expiry",
+            CacheMode.PARTITIONED, null), 1000);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedShortExpiry() {
+        testExpiringDataset(createCache("partitioned_short_expiry",
+            CacheMode.PARTITIONED, new Duration(TimeUnit.MILLISECONDS, 1)));
+    }
+
+    /** */
+    @Test
+    public void testReplicatedShortExpiry() {
+        testExpiringDataset(createCache("partitioned_short_expiry",
+            CacheMode.REPLICATED, new Duration(TimeUnit.MILLISECONDS, 1)));
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
new file mode 100644
index 0000000..17dd603
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentUpdatesTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+/**
+ * {@link ScanQueryConcurrentUpdatesAbstractTest} with caches created, updates 
and destroyed using Java API.
+ */
+public class ScanQueryConcurrentUpdatesTest extends 
ScanQueryConcurrentUpdatesAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Integer> createCache(String 
cacheName, CacheMode cacheMode,
+                                                                  Duration 
expiration) {
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>(cacheName);
+        cacheCfg.setCacheMode(cacheMode);
+        if (expiration != null) {
+            
cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(expiration));
+            cacheCfg.setEagerTtl(true);
+        }
+
+        return grid(0).createCache(cacheCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void updateCache(IgniteCache<Integer, Integer> cache, 
int recordsNum) {
+        for (int i = 0; i < recordsNum; i++)
+            cache.put(i, i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void destroyCache(IgniteCache<Integer, Integer> cache) 
{
+        cache.destroy();
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
new file mode 100644
index 0000000..bb67535
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryConcurrentSqlUpdatesTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+/**
+ * {@link ScanQueryConcurrentUpdatesAbstractTest} with caches created, updates 
and destroyed using SQL DDL queries.
+ */
+public class ScanQueryConcurrentSqlUpdatesTest extends 
ScanQueryConcurrentUpdatesAbstractTest {
+    /**
+     * A name for a cache that will be used to execute DDL queries.
+     */
+    private static final String DUMMY_CACHE_NAME = "dummy";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Integer> createCache(String 
cacheName, CacheMode cacheMode,
+                                                                  Duration 
expiration) {
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>(cacheName);
+        cacheCfg.setCacheMode(cacheMode);
+        if (expiration != null) {
+            
cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(expiration));
+            cacheCfg.setEagerTtl(true);
+        }
+
+        IgniteEx ignite = grid(0);
+        ignite.addCacheConfiguration(cacheCfg);
+
+        ignite.getOrCreateCache(DUMMY_CACHE_NAME).query(new 
SqlFieldsQuery("CREATE TABLE " + cacheName + " " +
+            "(key int primary key, val int) " +
+            "WITH \"template=" + cacheName + ",wrap_value=false\""));
+
+        return ignite.cache("SQL_PUBLIC_" + cacheName.toUpperCase());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void updateCache(IgniteCache<Integer, Integer> cache, 
int recordsNum) {
+        String tblName = tableName(cache);
+
+        for (int i = 0; i < recordsNum; i++) {
+            cache.query(new SqlFieldsQuery(
+                "INSERT INTO " + tblName + " (key, val) " +
+                "VALUES (" + i + ", " + i + ")"));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void destroyCache(IgniteCache<Integer, Integer> cache) 
{
+        grid(0).cache(DUMMY_CACHE_NAME).query(new SqlFieldsQuery("DROP TABLE " 
+ tableName(cache)));
+    }
+
+    /**
+     * @param cache Cache to determine a table name for.
+     * @return Name of the table corresponding to the provided cache.
+     */
+    @SuppressWarnings("unchecked")
+    private String tableName(IgniteCache<Integer, Integer> cache) {
+        CacheConfiguration<Integer, Integer> cacheCfg =
+                (CacheConfiguration<Integer, Integer>) 
cache.getConfiguration(CacheConfiguration.class);
+        QueryEntity qe = cacheCfg.getQueryEntities().iterator().next();
+
+        return qe.getTableName();
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 09a2abf..457d2cd 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -183,6 +183,8 @@ import 
org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
 import 
org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryWithH2IndexingSelfTest;
+import 
org.apache.ignite.internal.processors.cache.query.ScanQueryConcurrentSqlUpdatesTest;
+import 
org.apache.ignite.internal.processors.cache.query.ScanQueryConcurrentUpdatesTest;
 import 
org.apache.ignite.internal.processors.cache.transaction.DmlInsideTransactionTest;
 import 
org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
 import 
org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest;
@@ -536,6 +538,8 @@ import org.junit.runners.Suite;
     IgniteCheckClusterStateBeforeExecuteQueryTest.class,
     OptimizedMarshallerIndexNameTest.class,
     SqlSystemViewsSelfTest.class,
+    ScanQueryConcurrentUpdatesTest.class,
+    ScanQueryConcurrentSqlUpdatesTest.class,
 
     GridIndexRebuildSelfTest.class,
     GridIndexRebuildTest.class,

Reply via email to