IGNITE-5380: Added SQL query entity conflicts validation in discovery thread. This closes #2082.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de658caf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de658caf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de658caf Branch: refs/heads/ignite-5293 Commit: de658caf8e21971659ba601bdf7132de32a793d1 Parents: ae4d073 Author: Alexander Paschenko <[email protected]> Authored: Mon Jun 5 21:34:25 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Jun 5 21:34:25 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 11 ++ .../processors/cache/GridCacheUtils.java | 3 + .../processors/query/GridQueryProcessor.java | 12 ++- .../internal/processors/query/QueryUtils.java | 44 ++++++++ .../query/h2/ddl/DdlStatementsProcessor.java | 18 +++- .../cache/index/H2DynamicTableSelfTest.java | 106 +++++++++++++++---- 6 files changed, 173 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 7ff5622..e4d2668 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -40,6 +40,8 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -281,6 +283,15 @@ class ClusterCachesInfo { "client cache (a cache with the given name is not started): " + req.cacheName())); } else { + SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( + req.startCacheConfiguration(), ctx.cache().cacheDescriptors()); + + if (err != null) { + ctx.cache().completeCacheStartFuture(req, false, err); + + continue; + } + CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration(); assert req.cacheType() != null : req; http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 4dc5f8e..27dd23d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.typedef.C1; @@ -1271,6 +1272,8 @@ public class GridCacheUtils { return new CacheAtomicUpdateTimeoutException(e.getMessage(), e); else if (e instanceof ClusterTopologyServerNotFoundException) return new CacheServerNotFoundException(e.getMessage(), e); + else if (e instanceof SchemaOperationException) + return new CacheException(e.getMessage(), e); if (e.getCause() instanceof CacheException) return (CacheException)e.getCause(); http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index e729af5..175bcea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1316,7 +1316,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { ccfg.setSqlEscapeAll(true); ccfg.setQueryEntities(Collections.singleton(entity)); - boolean res = ctx.grid().getOrCreateCache0(ccfg, true).get2(); + boolean res; + + try { + res = ctx.grid().getOrCreateCache0(ccfg, true).get2(); + } + catch (CacheException e) { + if (e.getCause() instanceof SchemaOperationException) + throw (SchemaOperationException)e.getCause(); + else + throw e; + } if (!res && !ifNotExists) throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, entity.getTableName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java index 748768e..6ac2390 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java @@ -27,6 +27,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; @@ -1022,6 +1023,49 @@ public class QueryUtils { } /** + * Check given {@link CacheConfiguration} for conflicts in table and index names from any query entities + * found in collection of {@link DynamicCacheDescriptor}s and belonging to the same schema. + * + * @param ccfg New cache configuration. + * @param descs Cache descriptors. + * @return Exception message describing found conflict or {@code null} if none found. + */ + public static SchemaOperationException checkQueryEntityConflicts(CacheConfiguration<?, ?> ccfg, + Collection<DynamicCacheDescriptor> descs) { + String schema = QueryUtils.normalizeSchemaName(ccfg.getName(), ccfg.getSqlSchema()); + + Set<String> idxNames = new HashSet<>(); + + Set<String> tblNames = new HashSet<>(); + + for (DynamicCacheDescriptor desc : descs) { + String descSchema = QueryUtils.normalizeSchemaName(desc.cacheName(), + desc.cacheConfiguration().getSqlSchema()); + + if (!F.eq(schema, descSchema)) + continue; + + for (QueryEntity e : desc.schema().entities()) { + tblNames.add(e.getTableName()); + + for (QueryIndex idx : e.getIndexes()) + idxNames.add(idx.getName()); + } + } + + for (QueryEntity e : ccfg.getQueryEntities()) { + if (!tblNames.add(e.getTableName())) + return new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, e.getTableName()); + + for (QueryIndex idx : e.getIndexes()) + if (!idxNames.add(idx.getName())) + return new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idx.getName()); + } + + return null; + } + + /** * Validate query entity. * * @param entity Entity. http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index c9e3295..fb2129b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; @@ -85,7 +86,7 @@ public class DdlStatementsProcessor { * @param sql SQL. * @param stmt H2 statement to parse and execute. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) public FieldsQueryCursor<List<?>> runDdlStatement(String sql, PreparedStatement stmt) throws IgniteCheckedException { assert stmt instanceof JdbcPreparedStatement; @@ -162,7 +163,20 @@ public class DdlStatementsProcessor { cmd.tableName()); } else { - ctx.query().dynamicTableCreate(cmd.schemaName(), toQueryEntity(cmd), cmd.templateName(), + QueryEntity e = toQueryEntity(cmd); + + CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(cmd.tableName()); + + ccfg.setQueryEntities(Collections.singleton(e)); + ccfg.setSqlSchema(cmd.schemaName()); + + SchemaOperationException err = + QueryUtils.checkQueryEntityConflicts(ccfg, ctx.cache().cacheDescriptors()); + + if (err != null) + throw err; + + ctx.query().dynamicTableCreate(cmd.schemaName(), e, cmd.templateName(), cmd.atomicityMode(), cmd.backups(), cmd.ifNotExists()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de658caf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java index d97d1eb..1d376c1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java @@ -20,17 +20,22 @@ package org.apache.ignite.internal.processors.cache.index; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.Callable; import javax.cache.CacheException; + +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -42,8 +47,11 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; /** @@ -89,6 +97,9 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { if (client().cache("Person") != null) executeDdl("DROP TABLE IF EXISTS PUBLIC.\"Person\""); + + executeDdl("DROP TABLE IF EXISTS PUBLIC.\"City\""); + super.afterTest(); } @@ -376,37 +387,68 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { } /** - * Execute {@code CREATE TABLE} w/given params. - * @param params Engine parameters. + * Tests index name conflict check in discovery thread. + * @throws Exception if failed. */ - private void createTableWithParams(final String params) { - cache().query(new SqlFieldsQuery("CREATE TABLE \"Person\" (\"id\" int, \"city\" varchar" + - ", \"name\" varchar, \"surname\" varchar, \"age\" int, PRIMARY KEY (\"id\", \"city\")) WITH " + - "\"template=cache," + params + '"')); + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testIndexNameConflictCheckDiscovery() throws Exception { + executeDdl(grid(0), "CREATE TABLE \"Person\" (id int primary key, name varchar)"); + + executeDdl(grid(0), "CREATE INDEX \"idx\" ON \"Person\" (\"name\")"); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + QueryEntity e = new QueryEntity(); + + e.setTableName("City"); + e.setKeyFields(Collections.singleton("name")); + e.setFields(new LinkedHashMap<>(Collections.singletonMap("name", String.class.getName()))); + e.setIndexes(Collections.singleton(new QueryIndex("name").setName("idx"))); + e.setValueType("CityKey"); + e.setValueType("City"); + + queryProcessor(client()).dynamicTableCreate("PUBLIC", e, CacheMode.PARTITIONED.name(), + CacheAtomicityMode.ATOMIC, 10, false); + + return null; + } + }, SchemaOperationException.class, "Index already exists: idx"); } /** - * Execute {@code CREATE TABLE} w/given params expecting a particular error. - * @param params Engine parameters. - * @param expErrMsg Expected error message. + * Tests table name conflict check in {@link DdlStatementsProcessor}. + * @throws Exception if failed. */ - private void assertCreateTableWithParamsThrows(final String params, String expErrMsg) { + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testTableNameConflictCheckSql() throws Exception { + executeDdl(grid(0), "CREATE TABLE \"Person\" (id int primary key, name varchar)"); + GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - createTableWithParams(params); + @Override public Object call() throws Exception { + executeDdl(client(), "CREATE TABLE \"Person\" (id int primary key, name varchar)"); return null; } - }, IgniteSQLException.class, expErrMsg); + }, IgniteSQLException.class, "Table already exists: Person"); } /** - * Test that {@code CREATE TABLE} on non-public schema causes an exception. + * Execute {@code CREATE TABLE} w/given params. + * @param params Engine parameters. + */ + private void createTableWithParams(final String params) { + executeDdl("CREATE TABLE \"Person\" (\"id\" int, \"city\" varchar" + + ", \"name\" varchar, \"surname\" varchar, \"age\" int, PRIMARY KEY (\"id\", \"city\")) WITH " + + "\"template=cache," + params + '"'); + } + + /** + * Test that {@code CREATE TABLE} in non-public schema causes an exception. * * @throws Exception if failed. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testCreateTableNotPublicSchema() throws Exception { + public void testCreateTableInNonPublicSchema() throws Exception { GridTestUtils.assertThrows(null, new Callable<Object>() { @Override public Object call() throws Exception { executeDdl("CREATE TABLE \"cache_idx\".\"Person\" (\"id\" int, \"city\" varchar," + @@ -419,6 +461,21 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { } /** + * Execute {@code CREATE TABLE} w/given params expecting a particular error. + * @param params Engine parameters. + * @param expErrMsg Expected error message. + */ + private void assertCreateTableWithParamsThrows(final String params, String expErrMsg) { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + createTableWithParams(params); + + return null; + } + }, IgniteSQLException.class, expErrMsg); + } + + /** * Test that {@code DROP TABLE} on non-public schema causes an exception. * * @throws Exception if failed. @@ -435,12 +492,12 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { } /** - * Execute DDL statement. + * Execute DDL statement on client node. * * @param sql Statement. */ private void executeDdl(String sql) { - queryProcessor(client()).querySqlFieldsNoCache(new SqlFieldsQuery(sql).setSchema("PUBLIC"), true); + executeDdl(client(), sql); } /** @@ -514,6 +571,16 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { } /** + * Execute DDL statement on given node. + * + * @param node Node. + * @param sql Statement. + */ + private void executeDdl(Ignite node, String sql) { + queryProcessor(node).querySqlFieldsNoCache(new SqlFieldsQuery(sql).setSchema("PUBLIC"), true); + } + + /** * @return Client node. */ private IgniteEx client() { @@ -562,6 +629,9 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest { * @return Cache configuration with query entities in {@code PUBLIC} schema. */ private CacheConfiguration cacheConfigurationForIndexingInPublicSchema() { - return cacheConfigurationForIndexing().setName(INDEXED_CACHE_NAME_2).setSqlSchema(QueryUtils.DFLT_SCHEMA); + return cacheConfigurationForIndexing() + .setName(INDEXED_CACHE_NAME_2) + .setSqlSchema(QueryUtils.DFLT_SCHEMA) + .setNodeFilter(F.not(new DynamicIndexAbstractSelfTest.NodeFilter())); } }
