IGNITE-5317: Added method to execute SQL fields query without concrete cache. This closes #2024.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45de168 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45de168 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45de168 Branch: refs/heads/ignite-5075 Commit: c45de1681110e42b88c84d82507b8bc9286182ec Parents: 0feadac Author: devozerov <[email protected]> Authored: Wed May 31 10:36:13 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed May 31 10:36:13 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/jdbc2/JdbcConnection.java | 26 +- .../processors/query/GridQueryIndexing.java | 71 +++-- .../processors/query/GridQueryProcessor.java | 161 ++++++---- .../query/h2/DmlStatementsProcessor.java | 45 +-- .../processors/query/h2/IgniteH2Indexing.java | 306 +++++++------------ .../h2/GridIndexingSpiAbstractSelfTest.java | 121 ++------ 6 files changed, 327 insertions(+), 403 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index ee8b605..9385d7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -56,8 +56,12 @@ import org.apache.ignite.compute.ComputeTaskTimeoutException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.GridQueryIndexing; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; @@ -544,10 +548,24 @@ public class JdbcConnection implements Connection { if (!stream) stmt = new JdbcPreparedStatement(this, sql); else { + GridQueryIndexing idx = ignite().context().query().getIndexing(); + PreparedStatement nativeStmt = prepareNativeStatement(sql); - IgniteDataStreamer<?, ?> streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName, - nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite); + if (!idx.isInsertStatement(nativeStmt)) + throw new IgniteSQLException("Only INSERT operations are supported in streaming mode", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + IgniteDataStreamer streamer = ignite().dataStreamer(cacheName); + + streamer.autoFlushFrequency(streamFlushTimeout); + streamer.allowOverwrite(streamAllowOverwrite); + + if (streamNodeBufSize > 0) + streamer.perNodeBufferSize(streamNodeBufSize); + + if (streamNodeParOps > 0) + streamer.perNodeParallelOperations(streamNodeParOps); stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt); } @@ -736,8 +754,8 @@ public class JdbcConnection implements Connection { /** * @return Ignite node. */ - Ignite ignite() { - return ignite; + IgniteKernal ignite() { + return (IgniteKernal)ignite; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 9d66c0a..4429058 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -66,77 +66,79 @@ public interface GridQueryIndexing { /** * Parses SQL query into two step query and executes it. * - * @param cctx Cache context. + * @param schemaName Schema name. * @param qry Query. * @param keepBinary Keep binary flag. + * @param mainCacheId Main cache ID. * @return Cursor. * @throws IgniteCheckedException If failed. */ - public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?,?> cctx, SqlQuery qry, - boolean keepBinary) throws IgniteCheckedException; + public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry, + boolean keepBinary, int mainCacheId) throws IgniteCheckedException; /** * Parses SQL query into two step query and executes it. * - * @param cctx Cache context. + * @param schemaName Schema name. * @param qry Query. * @param keepBinary Keep binary flag. * @param cancel Query cancel. + * @param mainCacheId Main cache ID. * @return Cursor. * @throws IgniteCheckedException If failed. */ - public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry, - boolean keepBinary, GridQueryCancel cancel) throws IgniteCheckedException; + public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, + boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException; /** * Perform a MERGE statement using data streamer as receiver. * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param qry Query. * @param params Query parameters. * @param streamer Data streamer to feed data to. * @return Query result. * @throws IgniteCheckedException If failed. */ - public long streamUpdateQuery(String cacheName, String qry, @Nullable Object[] params, + public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException; /** * Executes regular query. * - * @param cctx Cache context. + * @param schemaName Schema name. * @param qry Query. * @param filter Cache name and key filter. * @param keepBinary Keep binary flag. * @return Cursor. */ - public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry, + public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName, SqlQuery qry, IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException; /** * Queries individual fields (generally used by JDBC drivers). * - * @param cctx Cache context. + * @param schemaName Schema name. * @param qry Query. * @param keepBinary Keep binary flag. * @param filter Cache name and key filter. * @param cancel Query cancel. * @return Cursor. */ - public FieldsQueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry, + public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException; /** * Executes text query. * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param qry Text query. * @param typeName Type name. * @param filter Cache name and key filter. * @return Queried rows. * @throws IgniteCheckedException If failed. */ - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String cacheName, String qry, + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry, String typeName, IndexingQueryFilter filter) throws IgniteCheckedException; /** @@ -196,11 +198,11 @@ public interface GridQueryIndexing { /** * Unregisters type and removes all corresponding data. * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param typeName Type name. * @throws IgniteCheckedException If failed. */ - public void unregisterType(String cacheName, String typeName) throws IgniteCheckedException; + public void unregisterType(String schemaName, String typeName) throws IgniteCheckedException; /** * Updates index. Note that key is unique for cache, so if cache contains multiple indexes @@ -231,19 +233,21 @@ public interface GridQueryIndexing { /** * Rebuilds all indexes of given type from hash index. * - * @param cacheName Cache name. - * @param type Type descriptor. + * @param cctx Cache context. + * @param schemaName Schema name. + * @param typeName Type name. * @throws IgniteCheckedException If failed. */ - public void rebuildIndexesFromHash(String cacheName, GridQueryTypeDescriptor type) throws IgniteCheckedException; + public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName) + throws IgniteCheckedException; /** * Marks all indexes of given type for rebuild from hash index, making them unusable until rebuild finishes. * * @param cacheName Cache name. - * @param type Type descriptor. + * @param typeName Type name. */ - public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type); + public void markForRebuildFromHash(String cacheName, String typeName); /** * Returns backup filter. @@ -264,11 +268,11 @@ public interface GridQueryIndexing { /** * Prepare native statement to retrieve JDBC metadata from. * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param sql Query. * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2. */ - public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException; + public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException; /** * Gets cache name from database schema. @@ -299,15 +303,18 @@ public interface GridQueryIndexing { public void cancelAllQueries(); /** - * @param cacheName Cache name. + * Gets database schema from cache name. + * + * @param cacheName Cache name. {@code null} would be converted to an empty string. + * @return Schema name. Should not be null since we should not fail for an invalid cache name. + */ + public String schema(String cacheName); + + /** + * Check if passed statement is insert statemtn. + * * @param nativeStmt Native statement. - * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}. - * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)} - * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)} - * @param allowOverwrite Overwrite existing cache values on key duplication. - * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata; - * {@code null} if given statement is a query. + * @return {@code True} if insert. */ - public IgniteDataStreamer<?,?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq, - int nodeBufSize, int nodeParOps, boolean allowOverwrite); + public boolean isInsertStatement(PreparedStatement nativeStmt); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 320c90b..990226e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -44,11 +44,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; -import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; @@ -1508,10 +1506,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param desc Type descriptor. * @return Future that will be completed when rebuilding of all indexes is finished. */ - private IgniteInternalFuture<Object> rebuildIndexesFromHash( - @Nullable final String cacheName, - @Nullable final QueryTypeDescriptorImpl desc - ) { + private IgniteInternalFuture<Object> rebuildIndexesFromHash(@Nullable final String cacheName, + @Nullable final QueryTypeDescriptorImpl desc) { if (idx == null) return new GridFinishedFuture<>(new IgniteCheckedException("Indexing is disabled.")); @@ -1520,12 +1516,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { final GridWorkerFuture<Object> fut = new GridWorkerFuture<>(); - idx.markForRebuildFromHash(cacheName, desc); + final String schemaName = idx.schema(cacheName); + final String typeName = desc.name(); + + idx.markForRebuildFromHash(schemaName, typeName); GridWorker w = new GridWorker(ctx.igniteInstanceName(), "index-rebuild-worker", log) { @Override protected void body() { try { - idx.rebuildIndexesFromHash(cacheName, desc); + int cacheId = CU.cacheId(cacheName); + + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); + + idx.rebuildIndexesFromHash(cctx, schemaName, typeName); fut.onDone(); } @@ -1533,7 +1536,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { fut.onDone(e); } catch (Throwable e) { - log.error("Failed to rebuild indexes for type: " + desc.name(), e); + log.error("Failed to rebuild indexes for type: " + typeName, e); fut.onDone(e); @@ -1721,12 +1724,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { final boolean keepBinary) { checkxEnabled(); - if (qry.isReplicatedOnly() && qry.getPartitions() != null) - throw new CacheException("Partitions are not supported in replicated only mode."); - - if (qry.isDistributedJoins() && qry.getPartitions() != null) - throw new CacheException( - "Using both partitions and distributed JOINs is not supported for the same query"); + validateSqlFieldsQuery(qry); boolean loc = (qry.isReplicatedOnly() && cctx.isReplicatedAffinityNode()) || cctx.isLocal() || qry.isLocal(); @@ -1734,6 +1732,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + final String schemaName = idx.schema(cctx.name()); + final int mainCacheId = CU.cacheId(cctx.name()); + IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo; if (loc) { @@ -1741,32 +1742,29 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException { GridQueryCancel cancel = new GridQueryCancel(); - final FieldsQueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry, keepBinary, - idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel); + FieldsQueryCursor<List<?>> cur; - Iterable<List<?>> iterExec = new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name()); + if (cctx.config().getQueryParallelism() > 1) { + qry.setDistributedJoins(true); - return cursor.iterator(); - } - }; + cur = idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, mainCacheId); + } + else { + IndexingQueryFilter filter = idx.backupFilter(requestTopVer.get(), qry.getPartitions()); - return new QueryCursorImpl<List<?>>(iterExec, cancel) { - @Override public List<GridQueryFieldMetadata> fieldsMeta() { - if (cursor instanceof QueryCursorImpl) - return ((QueryCursorEx)cursor).fieldsMeta(); + cur = idx.queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel); + } - return super.fieldsMeta(); - } - }; + sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name()); + + return cur; } }; } else { clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() { @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException { - return idx.queryDistributedSqlFields(cctx, qry, keepBinary, null); + return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId); } }; } @@ -1782,6 +1780,58 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Query SQL fields without strict dependency on concrete cache. + * + * @param schemaName Schema name. + * @param qry Query. + * @param keepBinary Keep binary flag. + * @return Cursot. + */ + public FieldsQueryCursor<List<?>> querySqlFieldsNoCache(final String schemaName, final SqlFieldsQuery qry, + final boolean keepBinary) { + checkxEnabled(); + + validateSqlFieldsQuery(qry); + + if (qry.isLocal()) + throw new IgniteException("Local query is not supported without specific cache."); + + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() { + @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException { + GridQueryCancel cancel = new GridQueryCancel(); + + return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, null); + } + }; + + return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), null, clo, true); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * Validate SQL fields query. + * + * @param qry Query. + */ + private static void validateSqlFieldsQuery(SqlFieldsQuery qry) { + if (qry.isReplicatedOnly() && qry.getPartitions() != null) + throw new CacheException("Partitions are not supported in replicated only mode."); + + if (qry.isDistributedJoins() && qry.getPartitions() != null) + throw new CacheException("Using both partitions and distributed JOINs is not supported for the same query"); + } + + /** * @param cacheName Cache name. * @param streamer Data streamer. * @param qry Query. @@ -1797,9 +1847,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { GridCacheContext cctx = ctx.cache().cache(cacheName).context(); + final String schemaName = idx.schema(cacheName); + return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() { @Override public Long applyx() throws IgniteCheckedException { - return idx.streamUpdateQuery(cacheName, qry, args, streamer); + return idx.streamUpdateQuery(schemaName, qry, args, streamer); } }, true); } @@ -1848,10 +1900,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + final String schemaName = idx.schema(cctx.name()); + final int mainCacheId = CU.cacheId(cctx.name()); + return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { - return idx.queryDistributedSql(cctx, qry, keepBinary); + return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId); } }, true); } @@ -1874,6 +1929,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); + final String schemaName = idx.schema(cctx.name()); + final int mainCacheId = CU.cacheId(cctx.name()); + try { return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { @@ -1889,8 +1947,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { qry.getArgs(), cctx.name()); - return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()), - keepBinary); + if (cctx.config().getQueryParallelism() > 1) { + qry.setDistributedJoins(true); + + return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId); + } + else + return idx.queryLocalSql(schemaName, qry, idx.backupFilter(requestTopVer.get(), + qry.getPartitions()), keepBinary); } }, true); } @@ -2036,7 +2100,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException { checkxEnabled(); - return idx.prepareNativeStatement(cacheName, sql); + String schemaName = idx.schema(cacheName); + + return idx.prepareNativeStatement(schemaName, sql); } /** @@ -2051,21 +2117,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name. - * @param nativeStmt Native statement. - * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}. - * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)} - * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)} - * @param allowOverwrite Overwrite existing cache values on key duplication. - * @see IgniteDataStreamer#allowOverwrite - * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata. - */ - public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq, - int nodeBufSize, int nodeParOps, boolean allowOverwrite) { - return idx.createStreamer(cacheName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite); - } - - /** - * @param cacheName Cache name. * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -2122,8 +2173,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { String typeName = typeName(cacheName, resType); + String schemaName = idx.schema(cacheName); - return idx.queryLocalText(cacheName, clause, typeName, filters); + return idx.queryLocalText(schemaName, clause, typeName, filters); } }, true); } @@ -2191,7 +2243,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param complete Complete. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public <R> R executeQuery(GridCacheQueryType qryType, String qry, GridCacheContext<?, ?> cctx, + public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException { final long startTime = U.currentTimeMillis(); @@ -2231,7 +2283,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { long duration = U.currentTimeMillis() - startTime; if (complete || failed) { - cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed); + if (cctx != null) + cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed); if (log.isTraceEnabled()) log.trace("Query execution [startTime=" + startTime + ", duration=" + duration + http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index d48c373..98d123f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; @@ -138,7 +139,7 @@ public class DmlStatementsProcessor { /** * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications. * - * @param schema Schema. + * @param schemaName Schema. * @param stmt JDBC statement. * @param fieldsQry Original query. * @param loc Query locality flag. @@ -147,13 +148,13 @@ public class DmlStatementsProcessor { * @return Update result (modified items count and failed keys). * @throws IgniteCheckedException if failed. */ - private UpdateResult updateSqlFields(String schema, PreparedStatement stmt, SqlFieldsQuery fieldsQry, + private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { Object[] errKeys = null; long items = 0; - UpdatePlan plan = getPlanForStatement(schema, stmt, null); + UpdatePlan plan = getPlanForStatement(schemaName, stmt, null); GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context(); @@ -177,7 +178,7 @@ public class DmlStatementsProcessor { UpdateResult r; try { - r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys); + r = executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys); } finally { cctx.operationContextPerCall(opCtx); @@ -201,7 +202,7 @@ public class DmlStatementsProcessor { } /** - * @param schema Schema. + * @param schemaName Schema. * @param stmt Prepared statement. * @param fieldsQry Initial query * @param cancel Query cancel. @@ -209,12 +210,12 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schema, PreparedStatement stmt, + QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, false, null, cancel); + UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel); QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(res.cnt)), null, false); + (Collections.singletonList(res.cnt)), cancel, false); resCur.fieldsMeta(UPDATE_RESULT_META); @@ -224,7 +225,7 @@ public class DmlStatementsProcessor { /** * Execute DML statement on local cache. * - * @param schema Schema. + * @param schemaName Schema. * @param stmt Prepared statement. * @param fieldsQry Fields query. * @param filters Cache name and key filter. @@ -233,10 +234,10 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - GridQueryFieldsResult updateSqlFieldsLocal(String schema, PreparedStatement stmt, + GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, true, filters, cancel); + UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel); return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator(Collections.singletonList(res.cnt))); @@ -276,8 +277,8 @@ public class DmlStatementsProcessor { final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum); - final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(args), - null, false, 0, null); + final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry, + F.asList(args), null, false, 0, null); QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { @@ -327,6 +328,7 @@ public class DmlStatementsProcessor { /** * Actually perform SQL DML operation locally. * + * @param schemaName Schema name. * @param cctx Cache context. * @param prepStmt Prepared statement for DML query. * @param fieldsQry Fields query. @@ -336,12 +338,14 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"ConstantConditions", "unchecked"}) - private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt, - SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, - Object[] failedKeys) throws IgniteCheckedException { + private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, + PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, + GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException { + int mainCacheId = CU.cacheId(cctx.name()); + Integer errKeysPos = null; - UpdatePlan plan = getPlanForStatement(idx.schema(cctx.name()), prepStmt, errKeysPos); + UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos); if (plan.fastUpdateArgs != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; @@ -364,16 +368,17 @@ public class DmlStatementsProcessor { .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(cctx, newFieldsQry, true, cancel); + cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, + mainCacheId); } else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, + final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); cur = new QueryCursorImpl<>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { try { - return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), cctx.keepBinary()); + return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true); } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 1e19954..bd611f6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; @@ -398,35 +399,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException { - String schemaName = schema(cacheName); - - return prepareStatement(connectionForSchema(schemaName), sql, true); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt, - long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) { - Prepared prep = GridSqlQueryParser.prepared(nativeStmt); - - if (!(prep instanceof Insert)) - throw new IgniteSQLException("Only INSERT operations are supported in streaming mode", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - - IgniteDataStreamer streamer = ctx.grid().dataStreamer(cacheName); - - streamer.autoFlushFrequency(autoFlushFreq); - - streamer.allowOverwrite(allowOverwrite); - - if (nodeBufSize > 0) - streamer.perNodeBufferSize(nodeBufSize); - - if (nodeParOps > 0) - streamer.perNodeParallelOperations(nodeParOps); + @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException { + Connection conn = connectionForSchema(schemaName); - return streamer; + return prepareStatement(conn, sql, true); } /** @@ -564,7 +540,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridCacheVersion ver, long expirationTime, long link) throws IgniteCheckedException { - H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName); + H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name()); if (tbl == null) return; // Type was rejected. @@ -588,7 +564,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']'); - H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName); + H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name()); if (tbl == null) return; @@ -782,13 +758,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { } @SuppressWarnings("unchecked") - @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText( - String cacheName, String qry, String typeName, - IndexingQueryFilter filters) throws IgniteCheckedException { - H2TableDescriptor tbl = tableDescriptor(typeName, cacheName); + @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry, + String typeName, IndexingQueryFilter filters) throws IgniteCheckedException { + H2TableDescriptor tbl = tableDescriptor(schemaName, typeName); if (tbl != null && tbl.luceneIndex() != null) { - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, cacheName, + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, schemaName, U.currentTimeMillis(), null, true); try { @@ -805,9 +780,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void unregisterType(String cacheName, String typeName) + @Override public void unregisterType(String schemaName, String typeName) throws IgniteCheckedException { - H2TableDescriptor tbl = tableDescriptor(typeName, cacheName); + H2TableDescriptor tbl = tableDescriptor(schemaName, typeName); if (tbl != null) removeTable(tbl); @@ -816,7 +791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * Queries individual fields (generally used by JDBC drivers). * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param qry Query. * @param params Query parameters. * @param filter Cache name and key filter. @@ -827,12 +802,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public GridQueryFieldsResult queryLocalSqlFields(final String cacheName, final String qry, + public GridQueryFieldsResult queryLocalSqlFields(final String schemaName, final String qry, @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder, final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException { - final String schema = schema(cacheName); - - final Connection conn = connectionForSchema(schema); + final Connection conn = connectionForSchema(schemaName); H2Utils.setupConnection(conn, false, enforceJoinOrder); @@ -849,7 +822,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fldsQry.setEnforceJoinOrder(enforceJoinOrder); fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); - return dmlProc.updateSqlFieldsLocal(schema, stmt, fldsQry, filter, cancel); + return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel); } else if (DdlStatementsProcessor.isDdlStatement(p)) throw new IgniteSQLException("DDL statements are supported for the whole cluster only", @@ -874,12 +847,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, - cacheName, U.currentTimeMillis(), cancel, true); + schemaName, U.currentTimeMillis(), cancel, true); runs.putIfAbsent(run.id(), run); try { - ResultSet rs = executeSqlQueryWithTimer(schema, stmt, conn, qry, params, timeout, cancel); + ResultSet rs = executeSqlQueryWithTimer(schemaName, stmt, conn, qry, params, timeout, cancel); return new H2FieldsIterator(rs); } @@ -893,10 +866,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public long streamUpdateQuery(String cacheName, String qry, + @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException { - String schemaName = schema(cacheName); - final Connection conn = connectionForSchema(schemaName); final PreparedStatement stmt; @@ -1074,97 +1045,76 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx, - final SqlFieldsQuery qry, final boolean keepBinary, final IndexingQueryFilter filter, - final GridQueryCancel cancel) throws IgniteCheckedException { - - if (cctx.config().getQueryParallelism() > 1) { - qry.setDistributedJoins(true); + @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, + final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { + String sql = qry.getSql(); + Object[] args = qry.getArgs(); - assert qry.isLocal(); + final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, F.asList(args), filter, + qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); - return queryDistributedSqlFields(cctx, qry, keepBinary, cancel); - } - else { - final String cacheName = cctx.name(); - final String sql = qry.getSql(); - final Object[] args = qry.getArgs(); - - final GridQueryFieldsResult res = queryLocalSqlFields(cacheName, sql, F.asList(args), filter, - qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); - - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - try { - return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + try { + return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - }, cancel); + } + }, cancel); - cursor.fieldsMeta(res.metaData()); + cursor.fieldsMeta(res.metaData()); - return cursor; - } + return cursor; } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx, + @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName, final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException { - if (cctx.config().getQueryParallelism() > 1) { - qry.setDistributedJoins(true); + String type = qry.getType(); + String sqlQry = qry.getSql(); + String alias = qry.getAlias(); + Object[] params = qry.getArgs(); - assert qry.isLocal(); + GridQueryCancel cancel = new GridQueryCancel(); - return queryDistributedSql(cctx, qry, keepBinary); - } - else { - String cacheName = cctx.name(); - String type = qry.getType(); - String sqlQry = qry.getSql(); - String alias = qry.getAlias(); - Object[] params = qry.getArgs(); - - GridQueryCancel cancel = new GridQueryCancel(); - - final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(cacheName, sqlQry, alias, - F.asList(params), type, filter, cancel); - - return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() { - @Override public Iterator<Cache.Entry<K, V>> iterator() { - return new ClIter<Cache.Entry<K, V>>() { - @Override public void close() throws Exception { - i.close(); - } + final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(schemaName, sqlQry, alias, + F.asList(params), type, filter, cancel); - @Override public boolean hasNext() { - return i.hasNext(); - } + return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return new ClIter<Cache.Entry<K, V>>() { + @Override public void close() throws Exception { + i.close(); + } - @Override public Cache.Entry<K, V> next() { - IgniteBiTuple<K, V> t = i.next(); + @Override public boolean hasNext() { + return i.hasNext(); + } - return new CacheEntryImpl<>( - (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false), - (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false)); - } + @Override public Cache.Entry<K, V> next() { + IgniteBiTuple<K, V> t = i.next(); - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }, cancel); - } + K key = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get1(), keepBinary, false); + V val = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get2(), keepBinary, false); + + return new CacheEntryImpl<>(key, val); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }, cancel); } /** * Executes regular query. * - * @param cacheName Cache name. + * @param schemaName Schema name. * @param qry Query. * @param alias Table alias. * @param params Query parameters. @@ -1174,10 +1124,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String cacheName, + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String schemaName, final String qry, String alias, @Nullable final Collection<Object> params, String type, final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { - final H2TableDescriptor tbl = tableDescriptor(type, cacheName); + final H2TableDescriptor tbl = tableDescriptor(schemaName, type); if (tbl == null) throw new IgniteSQLException("Failed to find SQL table for type: " + type, @@ -1192,13 +1142,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter) .distributedJoinMode(OFF)); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, cacheName, + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, schemaName, U.currentTimeMillis(), null, true); runs.put(run.id(), run); try { - ResultSet rs = executeSqlQueryWithTimer(schema(cacheName), conn, sql, params, true, 0, cancel); + ResultSet rs = executeSqlQueryWithTimer(schemaName, conn, sql, params, true, 0, cancel); return new H2KeyValueIterator(rs); } @@ -1237,12 +1187,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?, ?> cctx, - SqlQuery qry, boolean keepBinary) { + @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry, + boolean keepBinary, int mainCacheId) { String type = qry.getType(); - String cacheName = cctx.name(); - H2TableDescriptor tblDesc = tableDescriptor(type, cacheName); + H2TableDescriptor tblDesc = tableDescriptor(schemaName, type); if (tblDesc == null) throw new IgniteSQLException("Failed to find SQL table for type: " + type, @@ -1268,7 +1217,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (qry.getTimeout() > 0) fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); - final QueryCursor<List<?>> res = queryDistributedSqlFields(cctx, fqry, keepBinary, null); + final QueryCursor<List<?>> res = queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId); final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() { @Override public Iterator<Cache.Entry<K, V>> iterator() { @@ -1301,12 +1250,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx, - SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel) { + @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, + SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) { final String sqlQry = qry.getSql(); - String schemaName = schema(cctx.name()); - Connection c = connectionForSchema(schemaName); final boolean enforceJoinOrder = qry.isEnforceJoinOrder(); @@ -1413,14 +1360,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { LinkedHashSet<Integer> caches0 = new LinkedHashSet<>(); - // Setup caches from schemas. assert twoStepQry != null; int tblCnt = twoStepQry.tablesCount(); - if (tblCnt > 0) { - caches0.add(cctx.cacheId()); + if (mainCacheId != null) + caches0.add(mainCacheId); + if (tblCnt > 0) { for (QueryTable tblKey : twoStepQry.tables()) { GridH2Table tbl = dataTable(tblKey); @@ -1429,8 +1376,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { caches0.add(cacheId); } } - else - caches0.add(cctx.cacheId()); + + if (caches0.isEmpty()) + throw new IgniteSQLException("Failed to find at least one cache for SQL statement: " + sqlQry); //Prohibit usage indices with different numbers of segments in same query. List<Integer> cacheIds = new ArrayList<>(caches0); @@ -1470,6 +1418,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (cachedQry == null && !twoStepQry.explain()) { cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy()); + twoStepCache.putIfAbsent(cachedQryKey, cachedQry); } @@ -1732,30 +1681,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Gets table descriptor by type and cache names. + * Get table descriptor. * + * @param schemaName Schema name. * @param type Type name. - * @param cacheName Cache name. - * @return Table descriptor. + * @return Descriptor. */ - @Nullable private H2TableDescriptor tableDescriptor(String type, String cacheName) { - String schemaName = schema(cacheName); - + @Nullable private H2TableDescriptor tableDescriptor(String schemaName, String type) { H2Schema schema = schemas.get(schemaName); if (schema == null) return null; return schema.tableByTypeName(type); - } + }; - /** - * Gets database schema from cache name. - * - * @param cacheName Cache name. {@code null} would be converted to an empty string. - * @return Schema name. Should not be null since we should not fail for an invalid cache name. - */ - public String schema(String cacheName) { + + /** {@inheritDoc} */ + @Override public String schema(String cacheName) { String res = cacheName2schema.get(cacheName); if (res == null) @@ -1764,6 +1707,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { return res; } + /** {@inheritDoc} */ + @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { + Prepared prep = GridSqlQueryParser.prepared(nativeStmt); + + return prep instanceof Insert; + } + /** * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}. */ @@ -1792,17 +1742,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { return schema.cacheName(); } - /** - * Rebuild indexes from hash index. - * - * @param cacheName Cache name. - * @param type Type descriptor. - * @throws IgniteCheckedException If failed. - */ + /** {@inheritDoc} */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - @Override public void rebuildIndexesFromHash(String cacheName, - GridQueryTypeDescriptor type) throws IgniteCheckedException { - H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName); + @Override public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName) + throws IgniteCheckedException { + H2TableDescriptor tbl = tableDescriptor(schemaName, typeName); if (tbl == null) return; @@ -1815,10 +1759,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { Cursor cursor = hashIdx.find((Session)null, null, null); - int cacheId = CU.cacheId(cacheName); - - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); - while (cursor.next()) { CacheDataRow dataRow = (CacheDataRow)cursor.get(); @@ -1859,8 +1799,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type) { - H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName); + @Override public void markForRebuildFromHash(String schemaName, String typeName) { + H2TableDescriptor tbl = tableDescriptor(schemaName, typeName); if (tbl == null) return; @@ -1871,40 +1811,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Gets size (for tests only). - * - * @param cacheName Cache name. - * @param typeName Type name. - * @return Size. - * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown. - */ - long size(String cacheName, String typeName) throws IgniteCheckedException { - String schemaName = schema(cacheName); - - H2TableDescriptor tbl = tableDescriptor(typeName, cacheName); - - if (tbl == null) - return -1; - - Connection conn = connectionForSchema(schemaName); - - H2Utils.setupConnection(conn, false, false); - - try { - ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(), - false), 0, null); - - if (!rs.next()) - throw new IllegalStateException(); - - return rs.getLong(1); - } - catch (SQLException e) { - throw new IgniteCheckedException(e); - } - } - - /** * @return Busy lock. */ public GridSpinBusyLock busyLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 73a7191..7b0cbf8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -247,33 +247,25 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract public void testSpi() throws Exception { IgniteH2Indexing spi = getIndexing(); - assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name())); - IgniteCache<Integer, BinaryObject> cacheA = ignite0.createCache(cacheACfg()); - assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name())); - IgniteCache<Integer, BinaryObject> cacheB = ignite0.createCache(cacheBCfg()); - // Initially all is empty. - assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); + assertFalse(spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select * from A.A", null, Collections.emptySet(), + typeAA.name(), null, null).hasNext()); - assertFalse(spi.queryLocalSql(typeAA.cacheName(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext()); - assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext()); - assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext()); + assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select * from A.B", null, Collections.emptySet(), + typeAB.name(), null, null).hasNext()); - assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A, A.B, A.A", null, + assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A", null, Collections.emptySet(), + typeBA.name(), null, null).hasNext()); + + assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A, A.B, A.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext()); try { - spi.queryLocalSql(typeBA.cacheName(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null, - Collections.emptySet(), typeBA.name(), null, null).hasNext(); + spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", + null, Collections.emptySet(), typeBA.name(), null, null).hasNext(); fail("Enumerations of aliases in select block must be prohibited"); } @@ -281,60 +273,23 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract // all fine } - assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select ab.* from A.B ab", null, + assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select ab.* from A.B ab", null, Collections.emptySet(), typeAB.name(), null, null).hasNext()); - assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select ba.* from B.A as ba", null, + assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select ba.* from B.A as ba", null, Collections.emptySet(), typeBA.name(), null, null).hasNext()); cacheA.put(1, aa("A", 1, "Vasya", 10).build()); - - assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); - cacheA.put(1, ab(1, "Vasya", 20, "Some text about Vasya goes here.").build()); - - // In one cache all keys must be unique. - assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); - cacheB.put(1, ba(2, "Petya", 25, true).build()); - - // No replacement because of different cache. - assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - cacheB.put(1, ba(2, "Kolya", 25, true).build()); - - // Replacement in the same table. - assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - cacheA.put(2, aa("A", 2, "Valera", 19).build()); - - assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - cacheA.put(3, aa("A", 3, "Borya", 18).build()); - - assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - cacheA.put(4, ab(4, "Vitalya", 20, "Very Good guy").build()); - assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - // Query data. - Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res = - spi.queryLocalSql(typeAA.cacheName(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null); + Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res = spi.queryLocalSql(spi.schema(typeAA.cacheName()), + "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null); assertTrue(res.hasNext()); assertEquals(aa("A", 3, "Borya", 18).build(), value(res.next())); @@ -342,7 +297,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next())); assertFalse(res.hasNext()); - res = spi.queryLocalSql(typeAA.cacheName(), "select aa.* from a aa order by aa.age", null, + res = spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select aa.* from a aa order by aa.age", null, Collections.emptySet(), typeAA.name(), null, null); assertTrue(res.hasNext()); @@ -351,7 +306,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next())); assertFalse(res.hasNext()); - res = spi.queryLocalSql(typeAB.cacheName(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null); + res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "from b order by name", null, Collections.emptySet(), + typeAB.name(), null, null); assertTrue(res.hasNext()); assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").build(), value(res.next())); @@ -359,7 +315,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next())); assertFalse(res.hasNext()); - res = spi.queryLocalSql(typeAB.cacheName(), "select bb.* from b as bb order by bb.name", null, + res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select bb.* from b as bb order by bb.name", null, Collections.emptySet(), typeAB.name(), null, null); assertTrue(res.hasNext()); @@ -368,16 +324,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next())); assertFalse(res.hasNext()); - - res = spi.queryLocalSql(typeBA.cacheName(), "from a", null, Collections.emptySet(), typeBA.name(), null, null); + res = spi.queryLocalSql(spi.schema(typeBA.cacheName()), "from a", null, Collections.emptySet(), typeBA.name(), + null, null); assertTrue(res.hasNext()); assertEquals(ba(2, "Kolya", 25, true).build(), value(res.next())); assertFalse(res.hasNext()); // Text queries - Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(typeAB.cacheName(), "good", - typeAB.name(), null); + Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(spi.schema(typeAB.cacheName()), + "good", typeAB.name(), null); assertTrue(txtRes.hasNext()); assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(txtRes.next())); @@ -385,7 +341,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract // Fields query GridQueryFieldsResult fieldsRes = - spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " + + spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " + "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null); String[] aliases = {"N1", "A1", "N2", "A2"}; @@ -410,33 +366,12 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract // Remove cacheA.remove(2); - - assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name())); - cacheB.remove(1); - assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); - // Unregister. - spi.unregisterType(typeAA.cacheName(), typeAA.name()); - - assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); - - spi.unregisterType(typeAB.cacheName(), typeAB.name()); - - assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name())); - assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name())); - assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name())); - - spi.unregisterType(typeBA.cacheName(), typeBA.name()); - - assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name())); + spi.unregisterType(spi.schema(typeAA.cacheName()), typeAA.name()); + spi.unregisterType(spi.schema(typeAB.cacheName()), typeAB.name()); + spi.unregisterType(spi.schema(typeBA.cacheName()), typeBA.name()); } /** @@ -469,8 +404,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract time = now; range *= 3; - GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range), - null, false, 0, null); + GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1, + range), null, false, 0, null); assert res.iterator().hasNext();
