Repository: ignite Updated Branches: refs/heads/master dde348607 -> f7aa443ba
IGNITE-6572: SQL: allowed many cache to share the same schema. This closes #2850. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee6daae5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee6daae5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee6daae5 Branch: refs/heads/master Commit: ee6daae5363bef96f3794cc8f6325fcdab275ec1 Parents: 1816fb9 Author: Denis Mekhanikov <[email protected]> Authored: Wed Oct 25 14:44:51 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Oct 25 14:44:51 2017 +0300 ---------------------------------------------------------------------- .../internal/processors/query/h2/H2Schema.java | 21 +++++ .../processors/query/h2/IgniteH2Indexing.java | 39 +++++++--- .../query/IgniteSqlSchemaIndexingTest.java | 30 +++++++- .../processors/query/SqlSchemaSelfTest.java | 80 ++++++++++++++++++-- 4 files changed, 150 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee6daae5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java index f5cf0f2..2fdf32d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java @@ -34,6 +34,9 @@ public class H2Schema { /** */ private final ConcurrentMap<H2TypeKey, H2TableDescriptor> typeToTbl = new ConcurrentHashMap<>(); + /** Usage count. */ + private int usageCnt; + /** * Constructor. * @@ -51,6 +54,24 @@ public class H2Schema { } /** + * Increments counter for number of caches having this schema. + * + * @return New value of caches counter. + */ + public int incrementUsageCount() { + return ++usageCnt; + } + + /** + * Increments counter for number of caches having this schema. + * + * @return New value of caches counter. + */ + public int decrementUsageCount() { + return --usageCnt; + } + + /** * @return Tables. */ public Collection<H2TableDescriptor> tables() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee6daae5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index eed1f19..9321c85 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -259,6 +259,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { private GridSpinBusyLock busyLock; /** */ + private final Object schemaMux = new Object(); + + /** */ private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>(); /** */ @@ -2286,10 +2289,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public void registerCache(String cacheName, String schemaName, GridCacheContext<?, ?> cctx) throws IgniteCheckedException { if (!isDefaultSchema(schemaName)) { - if (schemas.putIfAbsent(schemaName, new H2Schema(schemaName)) != null) - throw new IgniteCheckedException("Schema already registered: " + U.maskName(schemaName)); + synchronized (schemaMux) { + H2Schema schema = new H2Schema(schemaName); + + H2Schema oldSchema = schemas.putIfAbsent(schemaName, schema); + + if (oldSchema == null) + createSchema(schemaName); + else + schema = oldSchema; - createSchema(schemaName); + schema.incrementUsageCount(); + } } cacheName2schema.put(cacheName, schemaName); @@ -2301,9 +2312,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public void unregisterCache(String cacheName, boolean destroy) { String schemaName = schema(cacheName); - boolean dflt = isDefaultSchema(schemaName); - - H2Schema schema = dflt ? schemas.get(schemaName) : schemas.remove(schemaName); + H2Schema schema = schemas.get(schemaName); if (schema != null) { mapQryExec.onCacheStop(cacheName); @@ -2334,12 +2343,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - if (!dflt) { - try { - dropSchema(schemaName); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to drop schema on cache stop (will ignore): " + cacheName, e); + if (!isDefaultSchema(schemaName)) { + synchronized (schemaMux) { + if (schema.decrementUsageCount() == 0) { + schemas.remove(schemaName); + + try { + dropSchema(schemaName); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to drop schema on cache stop (will ignore): " + cacheName, e); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee6daae5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java index 33e35e0..570d2db 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java @@ -105,7 +105,35 @@ public class IgniteSqlSchemaIndexingTest extends GridCommonAbstractTest { return null; } - }, IgniteException.class, "Schema already registered: "); + }, IgniteException.class, "Duplicate index name"); + } + + /** + * Test collision of table names in different caches, sharing a single SQL schema. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCustomSchemaMultipleCachesTablesCollision() throws Exception { + //TODO: Rewrite with dynamic cache creation, and GRID start in #beforeTest after resolve of + //TODO: https://issues.apache.org/jira/browse/IGNITE-1094 + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + final CacheConfiguration cfg = cacheConfig("cache1", true, Integer.class, Fact.class) + .setSqlSchema("TEST_SCHEMA"); + + final CacheConfiguration collisionCfg = cacheConfig("cache2", true, Integer.class, Fact.class) + .setSqlSchema("TEST_SCHEMA"); + + IgniteConfiguration icfg = new IgniteConfiguration() + .setLocalHost("127.0.0.1") + .setCacheConfiguration(cfg, collisionCfg); + + Ignition.start(icfg); + + return null; + } + }, IgniteException.class, "Failed to register query type"); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee6daae5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSchemaSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSchemaSelfTest.java index 183a884..a4ee2e3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSchemaSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSchemaSelfTest.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.processors.query; import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -25,11 +28,9 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import java.util.Iterator; -import java.util.List; - /** * Tests for schemas. */ @@ -197,6 +198,63 @@ public class SqlSchemaSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCustomSchemaName() throws Exception { + IgniteCache<Long, Person> cache = registerQueryEntity("Person", CACHE_PERSON); + + testQueryEntity(cache, "Person"); + } + + /** + * Test multiple caches having the same schema. + * + * @throws Exception If failed. + */ + public void testCustomSchemaMultipleCaches() throws Exception { + for (int i = 1; i <= 3; i++) { + String tbl = "Person" + i; + + IgniteCache<Long, Person> cache = registerQueryEntity(tbl, "PersonCache" + i); + + testQueryEntity(cache, tbl); + } + + for (int i = 1; i < 3; i++) { + IgniteCache<Long, Person> cache = node.cache("PersonCache" + i); + + testQueryEntity(cache, "Person" + i); + } + } + + /** + * Test concurrent schema creation and destruction. + * + * @throws Exception If failed. + */ + public void testCustomSchemaConcurrentUse() throws Exception { + final AtomicInteger maxIdx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int i = 0; i < 100; i++) { + int idx = maxIdx.incrementAndGet(); + + String tbl = "Person" + idx; + + IgniteCache<Long, Person> cache = registerQueryEntity(tbl, "PersonCache" + idx); + + testQueryEntity(cache, tbl); + + cache.destroy(); + } + } + }, 4, "schema-test"); + } + + /** + * @param tbl Table name. + * @param cacheName Cache name. + * @return Cache with registered query entity. + */ + private IgniteCache<Long, Person> registerQueryEntity(String tbl, String cacheName) { QueryEntity qe = new QueryEntity() .setValueType(Person.class.getName()) .setKeyType(Long.class.getName()) @@ -207,17 +265,25 @@ public class SqlSchemaSelfTest extends GridCommonAbstractTest { .addQueryField("name", String.class.getName(), null) .addQueryField("orgId", Long.class.getName(), null); - qe.setTableName("Person"); + qe.setTableName(tbl); - IgniteCache<Long, Person> cache = node.createCache(new CacheConfiguration<Long, Person>() - .setName(CACHE_PERSON) + return node.createCache(new CacheConfiguration<Long, Person>() + .setName(cacheName) .setQueryEntities(Collections.singletonList(qe)) .setSqlSchema("TEST")); + } + /** + * Uses SQL to retrieve data from cache. + * + * @param cache Cache. + * @param tbl Table. + */ + private void testQueryEntity(IgniteCache<Long, Person> cache, String tbl) { cache.put(1L, new Person("Vasya", 2)); assertEquals(1, node.context().query().querySqlFieldsNoCache( - new SqlFieldsQuery("SELECT id, name, orgId FROM TEST.Person where (id = ?)").setArgs(1L), false + new SqlFieldsQuery(String.format("SELECT id, name, orgId FROM TEST.%s where (id = %d)", tbl, 1)), false ).getAll().size()); }
