Repository: ignite Updated Branches: refs/heads/master 625ad412c -> 692e48880
http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java new file mode 100644 index 0000000..b185535 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Container for connection properties passed by various drivers (JDBC drivers, possibly ODBC) having notion of an + * <b>SQL connection</b> - Ignite basically does not have one.<p> + * Also contains anything that a driver may need to share between threads processing queries of logically same client - + * see JDBC thin driver + */ +public class SqlClientContext implements AutoCloseable { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** Collocated flag. */ + private final boolean collocated; + + /** Lazy query execution flag. */ + private final boolean lazy; + + /** Skip reducer on update flag. */ + private final boolean skipReducerOnUpdate; + + /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */ + private final boolean streamAllowOverwrite; + + /** Parallel ops count per node for data streamer. */ + private final int streamNodeParOps; + + /** Node buffer size for data streamer. */ + private final int streamNodeBufSize; + + /** Auto flush frequency for streaming. */ + private final long streamFlushTimeout; + + /** Streamers for various caches. */ + private final Map<String, IgniteDataStreamer<?, ?>> streamers; + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param ctx Kernal context. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + * @param collocated Collocated flag. + * @param lazy Lazy query execution flag. + * @param skipReducerOnUpdate Skip reducer on update flag. + * @param stream Streaming state flag + * @param streamAllowOverwrite Allow overwrites for duplicate keys on streamed {@code INSERT}s. + * @param streamNodeParOps Parallel ops count per node for data streamer. + * @param streamNodeBufSize Node buffer size for data streamer. + * @param streamFlushTimeout Auto flush frequency for streaming. + */ + public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder, + boolean collocated, boolean lazy, boolean skipReducerOnUpdate, boolean stream, boolean streamAllowOverwrite, + int streamNodeParOps, int streamNodeBufSize, long streamFlushTimeout) { + this.ctx = ctx; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + this.collocated = collocated; + this.lazy = lazy; + this.skipReducerOnUpdate = skipReducerOnUpdate; + this.streamAllowOverwrite = streamAllowOverwrite; + this.streamNodeParOps = streamNodeParOps; + this.streamNodeBufSize = streamNodeBufSize; + this.streamFlushTimeout = streamFlushTimeout; + + streamers = stream ? new HashMap<>() : null; + + log = ctx.log(SqlClientContext.class.getName()); + + ctx.query().registerClientContext(this); + } + + /** + * @return Collocated flag. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @return Distributed joins flag. + */ + public boolean isDistributedJoins() { + return distributedJoins; + } + + /** + * @return Enforce join order flag. + */ + public boolean isEnforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * @return Lazy query execution flag. + */ + public boolean isLazy() { + return lazy; + } + + /** + * @return Skip reducer on update flag, + */ + public boolean isSkipReducerOnUpdate() { + return skipReducerOnUpdate; + } + + /** + * @return Streaming state flag (on or off). + */ + public boolean isStream() { + return streamers != null; + } + + /** + * @param cacheName Cache name. + * @return Streamer for given cache. + */ + public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) { + Map<String, IgniteDataStreamer<?, ?>> curStreamers = streamers; + + if (curStreamers == null) + return null; + + IgniteDataStreamer<?, ?> res = curStreamers.get(cacheName); + + if (res != null) + return res; + + res = ctx.grid().dataStreamer(cacheName); + + IgniteDataStreamer<?, ?> exStreamer = curStreamers.putIfAbsent(cacheName, res); + + if (exStreamer == null) { + res.autoFlushFrequency(streamFlushTimeout); + + res.allowOverwrite(streamAllowOverwrite); + + if (streamNodeBufSize > 0) + res.perNodeBufferSize(streamNodeBufSize); + + if (streamNodeParOps > 0) + res.perNodeParallelOperations(streamNodeParOps); + + return res; + } + else { // Someone got ahead of us. + res.close(); + + return exStreamer; + } + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + ctx.query().unregisterClientContext(this); + + if (streamers == null) + return; + + for (IgniteDataStreamer<?, ?> s : streamers.values()) + U.close(s, log); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java index 0627def..70f72a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java @@ -122,7 +122,7 @@ public class SqlParser { return cmd; } else - throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER); + throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY); case QUOTED: case MINUS: http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index df27c5f..6d7e9ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; @@ -244,12 +245,18 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { return null; } /** {@inheritDoc} */ - @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params, + @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, + SqlClientContext cliCtx) throws IgniteCheckedException { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException { return 0; } @@ -372,8 +379,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - return false; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index b114828..8d2a820 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -299,7 +299,7 @@ public final class GridTestUtils { call.call(); } catch (Throwable e) { - if (cls != e.getClass()) { + if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) { if (e.getClass() == CacheException.class && e.getCause() != null && e.getCause().getClass() == cls) e = e.getCause(); else { http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index ea6c7c1..284d50a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -385,6 +385,7 @@ public class DmlStatementsProcessor { /** * Perform given statement against given data streamer. Only rows based INSERT is supported. * + * @param schemaName Schema name. * @param streamer Streamer to feed data to. * @param stmt Statement. * @param args Statement arguments. @@ -392,81 +393,74 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) + long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) throws IgniteCheckedException { + idx.checkStatementStreamable(stmt); + Prepared p = GridSqlQueryParser.prepared(stmt); assert p != null; - final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); + final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null); - if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) - throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + - " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + assert plan.isLocalSubquery(); - if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) { - assert plan.isLocalSubquery(); + final GridCacheContext cctx = plan.cacheContext(); - final GridCacheContext cctx = plan.cacheContext(); + QueryCursorImpl<List<?>> cur; - QueryCursorImpl<List<?>> cur; + final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - - QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - try { - Iterator<List<?>> it; - - if (!F.isEmpty(plan.selectQuery())) { - GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), - plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), - null, false, 0, null); + QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + try { + Iterator<List<?>> it; - it = res.iterator(); - } - else - it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + if (!F.isEmpty(plan.selectQuery())) { + GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), + plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, false, 0, null); - return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); + it = res.iterator(); } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }, null); + else + it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); - data.addAll(stepCur.getAll()); - - cur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - return data.iterator(); + return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); } - }, null); - - if (plan.rowCount() == 1) { - IgniteBiTuple t = plan.processRow(cur.iterator().next()); + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, null); - streamer.addData(t.getKey(), t.getValue()); + data.addAll(stepCur.getAll()); - return 1; + cur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return data.iterator(); } + }, null); - Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cur.iterator().next()); - for (List<?> row : cur) { - final IgniteBiTuple t = plan.processRow(row); + streamer.addData(t.getKey(), t.getValue()); - rows.put(t.getKey(), t.getValue()); - } + return 1; + } + + Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); - streamer.addData(rows); + for (List<?> row : cur) { + final IgniteBiTuple t = plan.processRow(row); - return rows.size(); + rows.put(t.getKey(), t.getValue()); } - else - throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + streamer.addData(rows); + + return rows.size(); } /** @@ -519,7 +513,7 @@ public class DmlStatementsProcessor { .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, true, true, + cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true, cancel).get(0); } else if (plan.hasRows()) @@ -610,7 +604,7 @@ public class DmlStatementsProcessor { * @return Update plan. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, + UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException { H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc); http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 361623c..a0b2c34 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO; @@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory; @@ -149,7 +151,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; -import org.h2.command.dml.Insert; import org.h2.command.dml.NoOperation; import org.h2.engine.Session; import org.h2.engine.SysProperties; @@ -191,7 +192,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"}) public class IgniteH2Indexing implements GridQueryIndexing { public static final Pattern INTERNAL_CMD_RE = Pattern.compile( - "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE); + "^(create|drop)\\s+index|^alter\\s+table|^copy|^set|^flush", Pattern.CASE_INSENSITIVE); /* * Register IO for indexes. @@ -500,10 +501,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException { + @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) { Connection conn = connectionForSchema(schemaName); - return prepareStatement(conn, sql, true); + return prepareStatementAndCaches(conn, sql); } /** @@ -1013,7 +1014,60 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException(e); } - return dmlProc.streamUpdateQuery(streamer, stmt, params); + return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, + SqlClientContext cliCtx) throws IgniteCheckedException { + if (cliCtx == null || !cliCtx.isStream()) { + U.warn(log, "Connection is not in streaming mode."); + + return zeroBatchedStreamedUpdateResult(params.size()); + } + + final Connection conn = connectionForSchema(schemaName); + + final PreparedStatement stmt = prepareStatementAndCaches(conn, qry); + + if (GridSqlQueryParser.checkMultipleStatements(stmt)) + throw new IgniteSQLException("Multiple statements queries are not supported for streaming mode.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + checkStatementStreamable(stmt); + + Prepared p = GridSqlQueryParser.prepared(stmt); + + UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null); + + IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name()); + + if (streamer != null) { + List<Long> res = new ArrayList<>(params.size()); + + for (int i = 0; i < params.size(); i++) + res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i))); + + return res; + } + else { + U.warn(log, "Streaming has been turned off by concurrent command."); + + return zeroBatchedStreamedUpdateResult(params.size()); + } + } + + /** + * @param size Result size. + * @return List of given size filled with 0Ls. + */ + private static List<Long> zeroBatchedStreamedUpdateResult(int size) { + Long[] res = new Long[size]; + + Arrays.fill(res, 0); + + return Arrays.asList(res); } /** @@ -1399,7 +1453,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); final QueryCursor<List<?>> res = - querySqlFields(schemaName, fqry, keepBinary, true, null).get(0); + querySqlFields(schemaName, fqry, null, keepBinary, true, null).get(0); final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() { @Override public Iterator<Cache.Entry<K, V>> iterator() { @@ -1435,19 +1489,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Try executing query using native facilities. * * @param schemaName Schema name. - * @param qry Query. + * @param sql Query. * @return Result or {@code null} if cannot parse/process this query. */ - private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) { + private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, String sql) { // Heuristic check for fast return. - if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find()) + if (!INTERNAL_CMD_RE.matcher(sql.trim()).find()) return null; // Parse. SqlCommand cmd; try { - SqlParser parser = new SqlParser(schemaName, qry.getSql()); + SqlParser parser = new SqlParser(schemaName, sql); cmd = parser.nextCommand(); @@ -1455,15 +1509,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (parser.nextCommand() != null) return null; - // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE + // Currently supported commands are: + // CREATE/DROP INDEX + // COPY + // ALTER TABLE + // SET STREAMING + // FLUSH STREAMER if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || - cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand)) + cmd instanceof SqlAlterTableCommand || cmd instanceof SqlBulkLoadCommand)) return null; } catch (Exception e) { // Cannot parse, return. if (log.isDebugEnabled()) - log.debug("Failed to parse SQL with native parser [qry=" + qry.getSql() + ", err=" + e + ']'); + log.debug("Failed to parse SQL with native parser [qry=" + sql + ", err=" + e + ']'); if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK)) return null; @@ -1473,24 +1532,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (e instanceof SqlParseException) code = ((SqlParseException)e).code(); - throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(), + throw new IgniteSQLException("Failed to parse DDL statement: " + sql + ": " + e.getMessage(), code, e); } // Execute. if (cmd instanceof SqlBulkLoadCommand) { - FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd); + FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(sql, cmd); return Collections.singletonList(cursor); } else { try { - FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd); + FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(sql, cmd); return Collections.singletonList(cursor); } catch (IgniteCheckedException e) { - throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: " + throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sql + "]: " + e.getMessage(), e); } } @@ -1514,8 +1573,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("StringEquality") @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { - List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry); + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry.getSql()); if (res != null) return res; @@ -1553,8 +1612,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { // We may use this cached statement only for local queries and non queries. if (qry.isLocal() || !prepared.isQuery()) - return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, null, - keepBinary, cancel); + return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, cliCtx, + null, keepBinary, cancel); } } @@ -1584,7 +1643,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { firstArg += prepared.getParameters().size(); - res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, meta, keepBinary, cancel)); + res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, cliCtx, meta, keepBinary, cancel)); if (parseRes.twoStepQuery() != null && parseRes.twoStepQueryKey() != null && !parseRes.twoStepQuery().explain()) @@ -1600,14 +1659,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param prepared H2 command. * @param qry Fields query with flags. * @param twoStepQry Two-step query if this query must be executed in a distributed way. + * @param cliCtx Client context, or {@code null} if not applicable. * @param meta Metadata for {@code twoStepQry}. * @param keepBinary Whether binary objects must not be deserialized automatically. - * @param cancel Query cancel state holder. - * @return Query result. + * @param cancel Query cancel state holder. @return Query result. */ private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared, - SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary, - GridQueryCancel cancel) { + SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, @Nullable SqlClientContext cliCtx, + List<GridQueryFieldMetadata> meta, boolean keepBinary, GridQueryCancel cancel) { String sqlQry = qry.getSql(); boolean loc = qry.isLocal(); @@ -2276,10 +2335,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - Prepared prep = GridSqlQueryParser.prepared(nativeStmt); - - return prep instanceof Insert; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + if (!GridSqlQueryParser.isStreamableInsertStatement(nativeStmt)) + throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index 6f5b51f..5441e36 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -106,7 +106,7 @@ public class DdlStatementsProcessor { * @throws IgniteCheckedException On error. */ @SuppressWarnings("unchecked") - public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException{ + public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException { IgniteInternalFuture fut; try { @@ -211,12 +211,7 @@ public class DdlStatementsProcessor { if (fut != null) fut.get(); - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(0L)), null, false); - - resCur.fieldsMeta(UPDATE_RESULT_META); - - return resCur; + return zeroCursor(); } catch (SchemaOperationException e) { throw convert(e); @@ -230,6 +225,19 @@ public class DdlStatementsProcessor { } /** + * @return Single-column, single-row cursor with 0 as number of updated records. + */ + @SuppressWarnings("unchecked") + public static QueryCursorImpl<List<?>> zeroCursor() { + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(0L)), null, false); + + resCur.fieldsMeta(UPDATE_RESULT_META); + + return resCur; + } + + /** * Execute DDL statement. * * @param sql SQL. http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 10d485a..98fbb97 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -505,7 +505,7 @@ public final class UpdatePlan { /** * @return Local subquery flag. */ - @Nullable public boolean isLocalSubquery() { + public boolean isLocalSubquery() { return isLocSubqry; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index bced836..d897ac7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -85,20 +85,21 @@ public final class UpdatePlanBuilder { * @param loc Local query flag. * @param idx Indexing. * @param conn Connection. - * @param fieldsQuery Original query. + * @param fieldsQry Original query. * @return Update plan. */ public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx, - @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos) + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos) throws IgniteCheckedException { - assert !prepared.isQuery(); - GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared); if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert) - return planForInsert(stmt, loc, idx, conn, fieldsQuery); + return planForInsert(stmt, loc, idx, conn, fieldsQry); + else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete) + return planForUpdate(stmt, loc, idx, conn, fieldsQry, errKeysPos); else - return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos); + throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(), + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 04bc212..2d2c25c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -1993,6 +1993,18 @@ public class GridSqlQueryParser { } /** + * Check if passed statement is insert statement eligible for streaming. + * + * @param nativeStmt Native statement. + * @return {@code True} if streamable insert. + */ + public static boolean isStreamableInsertStatement(PreparedStatement nativeStmt) { + Prepared prep = prepared(nativeStmt); + + return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == null; + } + + /** * @param cond Condition. * @param o Object. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 069bdd7..cf8bb2e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -140,7 +140,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " + "from FactPurchase f, \"replicated-prod\".DimProduct p where p.id = f.productId "); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); set1.add((Integer)o.get(0)); @@ -154,7 +154,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select productId from FactPurchase group by productId"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(set0.add((Integer) o.get(0))); @@ -173,7 +173,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "where p.id = f.productId " + "group by f.productId, p.name"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(names.add((String)o.get(0))); @@ -190,7 +190,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "group by f.productId, p.name " + "having s >= 15"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(i(o, 1) >= 15); @@ -203,7 +203,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select top 3 distinct productId " + "from FactPurchase f order by productId desc "); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -216,7 +216,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select distinct productId " + "from FactPurchase f order by productId desc limit 2 offset 1"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -256,13 +256,13 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, true); + qryProc.querySqlFields(cache.context(), qry, null, false, true); return null; } }, IgniteSQLException.class, "Multiple statements queries are not supported"); - List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, false, false); + List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, null, false, false); assertEquals(2, cursors.size()); @@ -274,7 +274,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, false); + qryProc.querySqlFields(cache.context(), qry, null, false, false); return null; }