IGNITE-6316: SQL: fixed CacheConfiguration persistence logic for DDL operations. This closes #2701.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37b4b1d1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37b4b1d1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37b4b1d1 Branch: refs/heads/ignite-3478 Commit: 37b4b1d1444e849ef1c74201f2d69e7a250d6c2d Parents: f8ae8f9 Author: Alexander Paschenko <alexander.a.pasche...@gmail.com> Authored: Thu Sep 21 18:51:18 2017 +0300 Committer: devozerov <ppoze...@gmail.com> Committed: Thu Sep 21 18:51:18 2017 +0300 ---------------------------------------------------------------------- .../pagemem/store/IgnitePageStoreManager.java | 4 +- .../cache/CacheAffinitySharedManager.java | 4 +- .../persistence/file/FilePageStoreManager.java | 7 +- .../processors/query/GridQueryProcessor.java | 30 ++- .../pagemem/NoOpPageStoreManager.java | 2 +- .../IgnitePersistentStoreSchemaLoadTest.java | 237 ++++++++++++------- 6 files changed, 184 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index eaa85ad..64c5927 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.pagemem.store; import java.nio.ByteBuffer; import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; @@ -185,9 +184,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh /** * @param cacheData Cache configuration. + * @param overwrite Whether stored configuration should be overwritten if it exists. * @throws IgniteCheckedException If failed. */ - public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException; + public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException; /** * @param grpId Cache group ID. * @return {@code True} if index store for given cache group existed before node started. http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index beb7a7d..741e204 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -2596,8 +2596,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cctx.pageStore() != null && cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { try { cctx.pageStore().storeCacheData( - new StoredCacheData(cfg) - ); + new StoredCacheData(cfg), + false); } catch (IgniteCheckedException e) { U.error(log(), "Error while saving cache configuration on disk, cfg = " + cfg, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index d60151a..b3eb74a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -207,9 +207,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void storeCacheData( - StoredCacheData cacheData - ) throws IgniteCheckedException { + @Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException { File cacheWorkDir = cacheWorkDirectory(cacheData.config()); File file; @@ -222,10 +220,11 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen else file = new File(cacheWorkDir, CACHE_DATA_FILENAME); - if (!file.exists() || file.length() == 0) { + if (overwrite || !file.exists() || file.length() == 0) { try { file.createNewFile(); + // Pre-existing file will be truncated upon stream open. try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) { marshaller.marshal(cacheData, stream); } http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index e791101..51d3411 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -64,7 +64,9 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; @@ -503,8 +505,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (!msg.hasError()) { DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().cacheName()); - if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) + if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) { cacheDesc.schemaChangeFinish(msg); + + saveCacheConfiguration(cacheDesc); + } } // Propose message will be used from exchange thread to @@ -2504,6 +2509,29 @@ private IgniteInternalFuture<Object> rebuildIndexesFromHash(@Nullable final Stri ", sndNodeId=" + msg.senderNodeId() + ']'); } } + /** + * @param desc cache descriptor. + */ + private void saveCacheConfiguration(DynamicCacheDescriptor desc) { + GridCacheSharedContext cctx = ctx.cache().context(); + + if (cctx.pageStore() != null && cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { + CacheConfiguration cfg = desc.cacheConfiguration(); + + try { + StoredCacheData data = new StoredCacheData(cfg); + + if (desc.schema() != null) + data.queryEntities(desc.schema().entities()); + + cctx.pageStore().storeCacheData(data, true); + } + catch (IgniteCheckedException e) { + U.error(log, "Error while saving cache configuration on disk, cfg = " + cfg, e); + } + } + } + /** * Unwind pending messages for particular operation. http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java index 8fc2bdb..40887e8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java @@ -172,7 +172,7 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager { } /** {@inheritDoc} */ - @Override public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException { + @Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/37b4b1d1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java index 349f062..b4c08b2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java @@ -18,30 +18,27 @@ package org.apache.ignite.internal.processors.database; import java.io.Serializable; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.QueryIndex; -import org.apache.ignite.cache.QueryIndexType; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; @@ -53,21 +50,17 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest /** */ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** Index name. */ - private static final String IDX_NAME = "my_idx"; - /** Cache name. */ private static final String TMPL_NAME = "test_cache*"; /** Table name. */ private static final String TBL_NAME = Person.class.getSimpleName(); - /** Schema name. */ - private static final String SCHEMA_NAME = "PUBLIC"; - - /** Cache name. */ - private static final String CACHE_NAME = TBL_NAME; + /** Name of the cache created with {@code CREATE TABLE}. */ + private static final String SQL_CACHE_NAME = QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, TBL_NAME); + /** Name of the cache created upon cluster start. */ + private static final String STATIC_CACHE_NAME = TBL_NAME; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -83,9 +76,31 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest cfg.setPersistentStoreConfiguration(pCfg); + cfg.setActiveOnStart(true); + return cfg; } + /** + * Create node configuration with a cache pre-configured. + * @param gridName Node name. + * @return Node configuration with a cache pre-configured. + * @throws Exception if failed. + */ + @SuppressWarnings("unchecked") + private IgniteConfiguration getConfigurationWithStaticCache(String gridName) throws Exception { + IgniteConfiguration cfg = getConfiguration(gridName); + + CacheConfiguration ccfg = cacheCfg(STATIC_CACHE_NAME); + + ccfg.setIndexedTypes(Integer.class, Person.class); + ccfg.setSqlEscapeAll(true); + + cfg.setCacheConfiguration(ccfg); + + return optimize(cfg); + } + /** */ private CacheConfiguration cacheCfg(String name) { CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(); @@ -99,20 +114,6 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest return cfg; } - /** */ - private QueryEntity getEntity() { - LinkedHashMap<String, String> fields = new LinkedHashMap<>(); - - fields.put("id", Integer.class.getName()); - fields.put("name", String.class.getName()); - - QueryEntity entity = new QueryEntity(Integer.class.getName(), Person.class.getName()); - entity.setFields(fields); - entity.setTableName(TBL_NAME); - - return entity; - } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, "true"); @@ -132,98 +133,112 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest } /** */ - public void testPersistIndex() throws Exception { - IgniteEx ig0 = startGrid(0); - startGrid(1); + public void testDynamicSchemaChangesPersistence() throws Exception { + checkSchemaStateAfterNodeRestart(false); + } - final AtomicInteger cnt = new AtomicInteger(); + /** */ + public void testDynamicSchemaChangesPersistenceWithAliveCluster() throws Exception { + checkSchemaStateAfterNodeRestart(true); + } - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database(); + /** */ + @SuppressWarnings("unchecked") + public void testDynamicSchemaChangesPersistenceWithStaticCache() throws Exception { + IgniteEx node = startGrid(getConfigurationWithStaticCache(getTestIgniteInstanceName(0))); - db.addCheckpointListener(new DbCheckpointListener() { - @Override public void onCheckpointBegin(Context context) { - cnt.incrementAndGet(); - } - }); + node.active(true); - QueryIndex idx = new QueryIndex("name"); + IgniteCache cache = node.cache(STATIC_CACHE_NAME); - idx.setName(IDX_NAME); + assertNotNull(cache); - ig0.context().query().dynamicTableCreate(SCHEMA_NAME, getEntity(), TMPL_NAME, null, null, null, - null, 1, true); + CountDownLatch cnt = checkpointLatch(node); - assert indexCnt(ig0, CACHE_NAME) == 0; + assertEquals(0, indexCnt(node, STATIC_CACHE_NAME)); - ig0.context().query().dynamicIndexCreate(CACHE_NAME, SCHEMA_NAME, TBL_NAME, idx, false).get(); + makeDynamicSchemaChanges(node, STATIC_CACHE_NAME); - assert indexCnt(ig0, CACHE_NAME) == 1; + checkDynamicSchemaChanges(node, STATIC_CACHE_NAME); - waitForCheckpoint(cnt); + cnt.await(); - stopGrid(1); + stopGrid(0); - IgniteEx ig1 = startGrid(1); + // Restarting with no-cache configuration - otherwise stored configurations + // will be ignored due to cache names duplication. + node = startGrid(0); - assert indexCnt(ig1, CACHE_NAME) == 1; - } + node.active(true); - /** */ - public void testPersistCompositeIndex() throws Exception { - IgniteEx ig0 = startGrid(0); - startGrid(1); + checkDynamicSchemaChanges(node, STATIC_CACHE_NAME); + } - final AtomicInteger cnt = new AtomicInteger(); + /** + * Perform test with cache created with {@code CREATE TABLE}. + * @param aliveCluster Whether there should remain an alive node when tested node is restarted. + * @throws Exception if failed. + */ + private void checkSchemaStateAfterNodeRestart(boolean aliveCluster) throws Exception { + IgniteEx node = startGrid(0); - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database(); + node.active(true); - db.addCheckpointListener(new DbCheckpointListener() { - @Override public void onCheckpointBegin(Context context) { - cnt.incrementAndGet(); - } - }); + if (aliveCluster) + startGrid(1); - ig0.context().query().dynamicTableCreate(SCHEMA_NAME, getEntity(), TMPL_NAME, null, null, null, null, 1, true); + CountDownLatch cnt = checkpointLatch(node); - assert indexCnt(ig0, CACHE_NAME) == 0; + node.context().query().querySqlFieldsNoCache( + new SqlFieldsQuery("create table \"Person\" (\"id\" int primary key, \"name\" varchar)"), false).getAll(); - QueryIndex idx = new QueryIndex(Arrays.asList("id", "name"), QueryIndexType.SORTED); + assertEquals(0, indexCnt(node, SQL_CACHE_NAME)); - idx.setName(IDX_NAME); + makeDynamicSchemaChanges(node, QueryUtils.DFLT_SCHEMA); - ig0.context().query().dynamicIndexCreate(CACHE_NAME, SCHEMA_NAME, TBL_NAME, idx, false).get(); + checkDynamicSchemaChanges(node, SQL_CACHE_NAME); - assert indexCnt(ig0, CACHE_NAME) == 1; + cnt.await(); - waitForCheckpoint(cnt); + stopGrid(0); - stopGrid(1); + node = startGrid(0); - IgniteEx ig1 = startGrid(1); + node.active(true); - assert indexCnt(ig1, CACHE_NAME) == 1; + checkDynamicSchemaChanges(node, SQL_CACHE_NAME); } /** */ - private void waitForCheckpoint(final AtomicInteger cnt) throws IgniteInterruptedCheckedException { - final int i = cnt.get(); + private int indexCnt(IgniteEx node, String cacheName) { + DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName); + + int cnt = 0; - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return cnt.get() > i; + if (desc != null) { + QuerySchema schema = desc.schema(); + if (schema != null) { + for (QueryEntity entity : schema.entities()) + cnt += entity.getIndexes().size(); } - }, 2000); + } + return cnt; } /** */ - private int indexCnt(IgniteEx node, String cacheName) { - + private int colsCnt(IgniteEx node, String cacheName) { DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName); int cnt = 0; - for (QueryEntity entity : desc.schema().entities()) - cnt += entity.getIndexes().size(); + if (desc != null) { + QuerySchema schema = desc.schema(); + if (schema != null) { + + for (QueryEntity entity : schema.entities()) + cnt += entity.getFields().size(); + } + } return cnt; } @@ -236,6 +251,50 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest } /** + * @param node Node whose checkpoint to wait for. + * @return Latch released when checkpoint happens. + */ + private CountDownLatch checkpointLatch(IgniteEx node) { + final CountDownLatch cnt = new CountDownLatch(1); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)node.context().cache().context().database(); + + db.addCheckpointListener(new DbCheckpointListener() { + @Override public void onCheckpointBegin(Context ctx) { + cnt.countDown(); + } + }); + + return cnt; + } + + /** + * Create dynamic index and column. + * @param node Node. + * @param schema Schema name. + */ + private void makeDynamicSchemaChanges(IgniteEx node, String schema) { + node.context().query().querySqlFieldsNoCache( + new SqlFieldsQuery("create index \"my_idx\" on \"Person\" (\"id\", \"name\")").setSchema(schema), false) + .getAll(); + + node.context().query().querySqlFieldsNoCache( + new SqlFieldsQuery("alter table \"Person\" add column \"age\" int").setSchema(schema), false) + .getAll(); + } + + /** + * Check that dynamically created schema objects are in place. + * @param node Node. + * @param cacheName Cache name. + */ + private void checkDynamicSchemaChanges(IgniteEx node, String cacheName) { + assertEquals(1, indexCnt(node, cacheName)); + + assertEquals(3, colsCnt(node, cacheName)); + } + + /** * */ protected static class Person implements Serializable { @@ -269,21 +328,19 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest if (o == null || getClass() != o.getClass()) return false; - IgnitePersistentStoreSchemaLoadTest.Person person = (IgnitePersistentStoreSchemaLoadTest.Person)o; + IgnitePersistentStoreSchemaLoadTest.Person person = (IgnitePersistentStoreSchemaLoadTest.Person) o; - if (id != person.id) - return false; + return id == person.id && (name != null ? name.equals(person.name) : person.name == null); - return name != null ? name.equals(person.name) : person.name == null; } /** {@inheritDoc} */ @Override public int hashCode() { - int result = id; + int res = id; - result = 31 * result + (name != null ? name.hashCode() : 0); + res = 31 * res + (name != null ? name.hashCode() : 0); - return result; + return res; } } }