This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bab88  IGNITE-12536: Inconsistency between cache data and indexes 
when cache operation is interrupted.
b9bab88 is described below

commit b9bab88cbbbdb3ec250040fe1ded8948f3c75ae7
Author: tledkov <tled...@gridgain.com>
AuthorDate: Thu Jan 16 19:01:26 2020 +0300

    IGNITE-12536: Inconsistency between cache data and indexes when cache 
operation is interrupted.
---
 .../processors/query/h2/opt/GridH2Table.java       |  22 ++-
 ...encyAfterInterruptAtomicCacheOperationTest.java | 153 +++++++++++++++++++++
 ...sistencyAfterInterruptTxCacheOperationTest.java |  32 +++++
 .../SqlTwoCachesInGroupWithSameEntryTest.java      | 145 +++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite2.java          |   7 +
 5 files changed, 355 insertions(+), 4 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 9bdf622..77441e5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -492,7 +492,7 @@ public class GridH2Table extends TableBase {
         }
 
         // Acquire the lock.
-        lock(exclusive);
+        lock(exclusive, true);
 
         if (destroyed) {
             unlock(exclusive);
@@ -555,13 +555,27 @@ public class GridH2Table extends TableBase {
      *
      * @param exclusive Exclusive flag.
      */
-    @SuppressWarnings({"LockAcquiredButNotSafelyReleased", 
"CallToThreadYield"})
     private void lock(boolean exclusive) {
+        lock(exclusive, false);
+    }
+
+    /**
+     * Acquire table lock.
+     *
+     * @param exclusive Exclusive flag.
+     * @param interruptibly Acquires interruptibly lock or not interruplible 
lock flag.
+     */
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased", 
"CallToThreadYield"})
+    private void lock(boolean exclusive, boolean interruptibly) {
         Lock l = exclusive ? lock.writeLock() : lock.readLock();
 
         try {
-            if (!exclusive)
-                l.lockInterruptibly();
+            if (!exclusive) {
+                if (interruptibly)
+                    l.lockInterruptibly();
+                else
+                    l.lock();
+            }
             else {
                 for (;;) {
                     if (l.tryLock(200, TimeUnit.MILLISECONDS))
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java
new file mode 100644
index 0000000..16d12a7
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest extends 
AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEYS = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     */
+    protected CacheAtomicityMode atomicity() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testPut() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign.createCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicity())
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        Thread t = new Thread(() -> {
+            cache.put(1, 1);
+        });
+
+        t.start();
+
+        t.interrupt();
+
+        t.join();
+
+        assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * 
from Integer")).getAll().size());
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testPutAll() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign.createCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicity())
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        final Map<Integer, Integer> batch = new HashMap<>();
+
+        for (int i = 0; i < KEYS; ++i)
+            batch.put(i, i);
+
+        Thread t = new Thread(() -> {
+            cache.putAll(batch);
+        });
+
+        t.start();
+        t.interrupt();
+        t.join();
+
+        assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * 
from Integer")).getAll().size());
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testRemove() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign.createCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicity())
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        cache.put(1, 1);
+
+        Thread t = new Thread(() -> {
+            cache.remove(1);
+        });
+
+        t.start();
+
+        t.interrupt();
+
+        t.join();
+
+        assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * 
from Integer")).getAll().size());
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testRemoveAll() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign.createCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicity())
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        final Map<Integer, Integer> batch = new HashMap<>();
+
+        for (int i = 0; i < KEYS; ++i)
+            batch.put(i, i);
+
+        cache.putAll(batch);
+
+        Thread t = new Thread(() -> {
+            cache.removeAll(batch.keySet());
+        });
+
+        t.start();
+        t.interrupt();
+        t.join();
+
+        assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * 
from Integer")).getAll().size());
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java
new file mode 100644
index 0000000..36228f7
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+/**
+ *
+ */
+public class SqlIndexConsistencyAfterInterruptTxCacheOperationTest
+    extends SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest {
+    /**
+     */
+    protected CacheAtomicityMode atomicity() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java
new file mode 100644
index 0000000..8f41009
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class SqlTwoCachesInGroupWithSameEntryTest extends 
AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEYS = 50_000;
+
+    /**
+     * Test's parameters.
+     */
+    @Parameterized.Parameters(name = "persistence={0}, useOnlyPkHash={1}")
+    public static Iterable<Object[]> params() {
+        return Arrays.asList(
+            new Object[] {true, true},
+            new Object[] {true, false},
+            new Object[] {false, true},
+            new Object[] {false, false}
+        );
+    }
+
+    /** Enable persistence for the test. */
+    @Parameterized.Parameter(0)
+    public boolean persistenceEnabled;
+
+    /** Disable H2Tree indexes. */
+    @Parameterized.Parameter(1)
+    public boolean useOnlyPkHashIndex;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (persistenceEnabled)
+            cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)));
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        ign.cluster().active(true);
+
+        IgniteCache cache0 = ign.createCache(new CacheConfiguration<>("cache0")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setGroupName("grp0")
+            .setSqlSchema("CACHE0")
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        IgniteCache cache1 = ign.createCache(new CacheConfiguration<>("cache1")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setGroupName("grp0")
+            .setSqlSchema("CACHE1")
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        for (int i = 0; i < KEYS; ++i) {
+            cache0.put(i, i);
+            cache1.put(i, i);
+        }
+
+        if (useOnlyPkHashIndex) {
+            for (GridH2Table t : 
((IgniteH2Indexing)grid(0).context().query().getIndexing()).schemaManager().dataTables())
+                GridTestUtils.setFieldValue(t, "rebuildFromHashInProgress", 1);
+        }
+
+        assertEquals(KEYS, cache0.size());
+        assertEquals(KEYS, cache1.size());
+        assertEquals(KEYS, sql("select * FROM 
cache0.Integer").getAll().size());
+        assertEquals(KEYS, sql("select * FROM 
cache1.Integer").getAll().size());
+
+        cache0.clear();
+
+        assertEquals(0, cache0.size());
+        assertEquals(KEYS, cache1.size());
+        assertEquals(0, sql("select * FROM cache0.Integer").getAll().size());
+        assertEquals(KEYS, sql("select * FROM 
cache1.Integer").getAll().size());
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     * @return Results cursor.
+     */
+    private FieldsQueryCursor<List<?>> sql(String sql, Object ... args) {
+        return grid(0).context().query().querySqlFields(new SqlFieldsQuery(sql)
+            .setArgs(args), false);
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index c676896..35b57b9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -58,9 +58,12 @@ import 
org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmented
 import 
org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest;
 import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
 import org.apache.ignite.internal.processors.query.LongRunningQueryTest;
+import 
org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest;
+import 
org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptTxCacheOperationTest;
 import 
org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
 import 
org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest;
 import 
org.apache.ignite.internal.processors.query.SqlQueriesTopologyMappingTest;
+import 
org.apache.ignite.internal.processors.query.SqlTwoCachesInGroupWithSameEntryTest;
 import 
org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
 import 
org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessorTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
@@ -81,6 +84,10 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     SqlCacheStartStopTest.class,
 
+    SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.class,
+    SqlIndexConsistencyAfterInterruptTxCacheOperationTest.class,
+    SqlTwoCachesInGroupWithSameEntryTest.class,
+
     // Dynamic index create/drop tests.
     DynamicIndexPartitionedAtomicConcurrentSelfTest.class,
     DynamicIndexPartitionedTransactionalConcurrentSelfTest.class,

Reply via email to