This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b9bab88 IGNITE-12536: Inconsistency between cache data and indexes when cache operation is interrupted. b9bab88 is described below commit b9bab88cbbbdb3ec250040fe1ded8948f3c75ae7 Author: tledkov <tled...@gridgain.com> AuthorDate: Thu Jan 16 19:01:26 2020 +0300 IGNITE-12536: Inconsistency between cache data and indexes when cache operation is interrupted. --- .../processors/query/h2/opt/GridH2Table.java | 22 ++- ...encyAfterInterruptAtomicCacheOperationTest.java | 153 +++++++++++++++++++++ ...sistencyAfterInterruptTxCacheOperationTest.java | 32 +++++ .../SqlTwoCachesInGroupWithSameEntryTest.java | 145 +++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite2.java | 7 + 5 files changed, 355 insertions(+), 4 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 9bdf622..77441e5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -492,7 +492,7 @@ public class GridH2Table extends TableBase { } // Acquire the lock. - lock(exclusive); + lock(exclusive, true); if (destroyed) { unlock(exclusive); @@ -555,13 +555,27 @@ public class GridH2Table extends TableBase { * * @param exclusive Exclusive flag. */ - @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "CallToThreadYield"}) private void lock(boolean exclusive) { + lock(exclusive, false); + } + + /** + * Acquire table lock. + * + * @param exclusive Exclusive flag. + * @param interruptibly Acquires interruptibly lock or not interruplible lock flag. + */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "CallToThreadYield"}) + private void lock(boolean exclusive, boolean interruptibly) { Lock l = exclusive ? lock.writeLock() : lock.readLock(); try { - if (!exclusive) - l.lockInterruptibly(); + if (!exclusive) { + if (interruptibly) + l.lockInterruptibly(); + else + l.lock(); + } else { for (;;) { if (l.tryLock(200, TimeUnit.MILLISECONDS)) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java new file mode 100644 index 0000000..16d12a7 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.junit.Test; + +/** + * + */ +public class SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest extends AbstractIndexingCommonTest { + /** Keys count. */ + private static final int KEYS = 1000; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + */ + protected CacheAtomicityMode atomicity() { + return CacheAtomicityMode.ATOMIC; + } + + /** + * @throws Exception On error. + */ + @Test + public void testPut() throws Exception { + IgniteEx ign = startGrid(0); + + IgniteCache<Object, Object> cache = ign.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(atomicity()) + .setIndexedTypes(Integer.class, Integer.class)); + + Thread t = new Thread(() -> { + cache.put(1, 1); + }); + + t.start(); + + t.interrupt(); + + t.join(); + + assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * from Integer")).getAll().size()); + } + + /** + * @throws Exception On error. + */ + @Test + public void testPutAll() throws Exception { + IgniteEx ign = startGrid(0); + + IgniteCache<Object, Object> cache = ign.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(atomicity()) + .setIndexedTypes(Integer.class, Integer.class)); + + final Map<Integer, Integer> batch = new HashMap<>(); + + for (int i = 0; i < KEYS; ++i) + batch.put(i, i); + + Thread t = new Thread(() -> { + cache.putAll(batch); + }); + + t.start(); + t.interrupt(); + t.join(); + + assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * from Integer")).getAll().size()); + } + + /** + * @throws Exception On error. + */ + @Test + public void testRemove() throws Exception { + IgniteEx ign = startGrid(0); + + IgniteCache<Object, Object> cache = ign.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(atomicity()) + .setIndexedTypes(Integer.class, Integer.class)); + + cache.put(1, 1); + + Thread t = new Thread(() -> { + cache.remove(1); + }); + + t.start(); + + t.interrupt(); + + t.join(); + + assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * from Integer")).getAll().size()); + } + + /** + * @throws Exception On error. + */ + @Test + public void testRemoveAll() throws Exception { + IgniteEx ign = startGrid(0); + + IgniteCache<Object, Object> cache = ign.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(atomicity()) + .setIndexedTypes(Integer.class, Integer.class)); + + final Map<Integer, Integer> batch = new HashMap<>(); + + for (int i = 0; i < KEYS; ++i) + batch.put(i, i); + + cache.putAll(batch); + + Thread t = new Thread(() -> { + cache.removeAll(batch.keySet()); + }); + + t.start(); + t.interrupt(); + t.join(); + + assertEquals(cache.size(), cache.query(new SqlFieldsQuery("select * from Integer")).getAll().size()); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java new file mode 100644 index 0000000..36228f7 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlIndexConsistencyAfterInterruptTxCacheOperationTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.cache.CacheAtomicityMode; + +/** + * + */ +public class SqlIndexConsistencyAfterInterruptTxCacheOperationTest + extends SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest { + /** + */ + protected CacheAtomicityMode atomicity() { + return CacheAtomicityMode.TRANSACTIONAL; + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java new file mode 100644 index 0000000..8f41009 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlTwoCachesInGroupWithSameEntryTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Arrays; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * + */ +@RunWith(Parameterized.class) +public class SqlTwoCachesInGroupWithSameEntryTest extends AbstractIndexingCommonTest { + /** Keys count. */ + private static final int KEYS = 50_000; + + /** + * Test's parameters. + */ + @Parameterized.Parameters(name = "persistence={0}, useOnlyPkHash={1}") + public static Iterable<Object[]> params() { + return Arrays.asList( + new Object[] {true, true}, + new Object[] {true, false}, + new Object[] {false, true}, + new Object[] {false, false} + ); + } + + /** Enable persistence for the test. */ + @Parameterized.Parameter(0) + public boolean persistenceEnabled; + + /** Disable H2Tree indexes. */ + @Parameterized.Parameter(1) + public boolean useOnlyPkHashIndex; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + if (persistenceEnabled) + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled))); + } + + /** + * @throws Exception On error. + */ + @SuppressWarnings("unchecked") + @Test + public void test() throws Exception { + IgniteEx ign = startGrid(0); + + ign.cluster().active(true); + + IgniteCache cache0 = ign.createCache(new CacheConfiguration<>("cache0") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setGroupName("grp0") + .setSqlSchema("CACHE0") + .setIndexedTypes(Integer.class, Integer.class)); + + IgniteCache cache1 = ign.createCache(new CacheConfiguration<>("cache1") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setGroupName("grp0") + .setSqlSchema("CACHE1") + .setIndexedTypes(Integer.class, Integer.class)); + + for (int i = 0; i < KEYS; ++i) { + cache0.put(i, i); + cache1.put(i, i); + } + + if (useOnlyPkHashIndex) { + for (GridH2Table t : ((IgniteH2Indexing)grid(0).context().query().getIndexing()).schemaManager().dataTables()) + GridTestUtils.setFieldValue(t, "rebuildFromHashInProgress", 1); + } + + assertEquals(KEYS, cache0.size()); + assertEquals(KEYS, cache1.size()); + assertEquals(KEYS, sql("select * FROM cache0.Integer").getAll().size()); + assertEquals(KEYS, sql("select * FROM cache1.Integer").getAll().size()); + + cache0.clear(); + + assertEquals(0, cache0.size()); + assertEquals(KEYS, cache1.size()); + assertEquals(0, sql("select * FROM cache0.Integer").getAll().size()); + assertEquals(KEYS, sql("select * FROM cache1.Integer").getAll().size()); + } + + /** + * @param sql SQL query. + * @param args Query parameters. + * @return Results cursor. + */ + private FieldsQueryCursor<List<?>> sql(String sql, Object ... args) { + return grid(0).context().query().querySqlFields(new SqlFieldsQuery(sql) + .setArgs(args), false); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java index c676896..35b57b9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java @@ -58,9 +58,12 @@ import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmented import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest; import org.apache.ignite.internal.processors.query.LocalQueryLazyTest; import org.apache.ignite.internal.processors.query.LongRunningQueryTest; +import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest; +import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptTxCacheOperationTest; import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest; import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest; import org.apache.ignite.internal.processors.query.SqlQueriesTopologyMappingTest; +import org.apache.ignite.internal.processors.query.SqlTwoCachesInGroupWithSameEntryTest; import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest; import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessorTest; import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest; @@ -81,6 +84,10 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ SqlCacheStartStopTest.class, + SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest.class, + SqlIndexConsistencyAfterInterruptTxCacheOperationTest.class, + SqlTwoCachesInGroupWithSameEntryTest.class, + // Dynamic index create/drop tests. DynamicIndexPartitionedAtomicConcurrentSelfTest.class, DynamicIndexPartitionedTransactionalConcurrentSelfTest.class,