Repository: ignite Updated Branches: refs/heads/master ae2bf3d6f -> 7366809ed
http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index f666cdd..bde9427 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -263,7 +263,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.cancelAllQueries(); return; - } catch (InterruptedException ignored) { + } + catch (InterruptedException ignored) { U.warn(log, "Interrupted while waiting for active queries cancellation."); Thread.currentThread().interrupt(); @@ -1974,13 +1975,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ public List<FieldsQueryCursor<List<?>>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary, final boolean failOnMultipleStmts) { - return querySqlFields(null, qry, keepBinary, failOnMultipleStmts); - } - - @SuppressWarnings("unchecked") - public FieldsQueryCursor<List<?>> querySqlFields(final GridCacheContext<?,?> cctx, final SqlFieldsQuery qry, - final boolean keepBinary) { - return querySqlFields(cctx, qry, keepBinary, true).get(0); + return querySqlFields(null, qry, null, keepBinary, failOnMultipleStmts); } /** @@ -1991,7 +1986,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @return Cursor. */ public FieldsQueryCursor<List<?>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary) { - return querySqlFields(null, qry, keepBinary, true).get(0); + return querySqlFields(null, qry, null, keepBinary, true).get(0); } /** @@ -1999,14 +1994,16 @@ public class GridQueryProcessor extends GridProcessorAdapter { * * @param cctx Cache context. * @param qry Query. + * @param cliCtx Client context. * @param keepBinary Keep binary flag. * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains * more then one SQL statement. * @return Cursor. */ @SuppressWarnings("unchecked") - public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final GridCacheContext<?,?> cctx, - final SqlFieldsQuery qry, final boolean keepBinary, final boolean failOnMultipleStmts) { + public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final GridCacheContext<?, ?> cctx, + final SqlFieldsQuery qry, final SqlClientContext cliCtx, final boolean keepBinary, + final boolean failOnMultipleStmts) { checkxEnabled(); validateSqlFieldsQuery(qry); @@ -2034,7 +2031,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { GridQueryCancel cancel = new GridQueryCancel(); List<FieldsQueryCursor<List<?>>> res = - idx.querySqlFields(schemaName, qry, keepBinary, failOnMultipleStmts, cancel); + idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel); if (cctx != null) sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); @@ -2073,7 +2070,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param schemaName Schema name. * @param streamer Data streamer. * @param qry Query. - * @return Iterator. + * @return Update counter. */ public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName, final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) { @@ -2100,6 +2097,33 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param schemaName Schema name. + * @param cliCtx Client context. + * @param qry Query. + * @param args Query arguments. + * @return Update counters. + */ + public List<Long> streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx, + final String qry, final List<Object[]> args) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX<List<Long>>() { + @Override public List<Long> applyx() throws IgniteCheckedException { + return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx); + } + }, true); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** * Execute distributed SQL query. * * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java new file mode 100644 index 0000000..e8c2932 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Container for connection properties passed by various drivers (JDBC drivers, possibly ODBC) having notion of an + * <b>SQL connection</b> - Ignite basically does not have one.<p> + * Also contains anything that a driver may need to share between threads processing queries of logically same client - + * see JDBC thin driver + */ +public class SqlClientContext implements AutoCloseable { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** Collocated flag. */ + private final boolean collocated; + + /** Replicated caches only flag. */ + private final boolean replicatedOnly; + + /** Lazy query execution flag. */ + private final boolean lazy; + + /** Skip reducer on update flag. */ + private final boolean skipReducerOnUpdate; + + /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */ + private boolean streamAllowOverwrite; + + /** Parallel ops count per node for data streamer. */ + private int streamNodeParOps; + + /** Node buffer size for data streamer. */ + private int streamNodeBufSize; + + /** Auto flush frequency for streaming. */ + private long streamFlushTimeout; + + /** Streamers for various caches. */ + private Map<String, IgniteDataStreamer<?, ?>> streamers; + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param ctx Kernal context. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + * @param collocated Collocated flag. + * @param replicatedOnly Replicated caches only flag. + * @param lazy Lazy query execution flag. + * @param skipReducerOnUpdate Skip reducer on update flag. + */ + public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder, + boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate) { + this.ctx = ctx; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + this.collocated = collocated; + this.replicatedOnly = replicatedOnly; + this.lazy = lazy; + this.skipReducerOnUpdate = skipReducerOnUpdate; + + log = ctx.log(SqlClientContext.class.getName()); + } + + /** + * Turn on streaming on this client context. + * + * @param allowOverwrite Whether streaming should overwrite existing values. + * @param flushFreq Flush frequency for streamers. + * @param perNodeBufSize Per node streaming buffer size. + * @param perNodeParOps Per node streaming parallel operations number. + */ + public void enableStreaming(boolean allowOverwrite, long flushFreq, int perNodeBufSize, int perNodeParOps) { + if (isStream()) + return; + + streamers = new HashMap<>(); + + this.streamAllowOverwrite = allowOverwrite; + this.streamFlushTimeout = flushFreq; + this.streamNodeBufSize = perNodeBufSize; + this.streamNodeParOps = perNodeParOps; + } + + /** + * Turn off streaming on this client context - with closing all open streamers, if any. + */ + public void disableStreaming() { + if (!isStream()) + return; + + Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator(); + + while (it.hasNext()) { + IgniteDataStreamer<?, ?> streamer = it.next(); + + U.close(streamer, log); + + it.remove(); + } + + streamers = null; + } + + /** + * @return Collocated flag. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @return Distributed joins flag. + */ + public boolean isDistributedJoins() { + return distributedJoins; + } + + /** + * @return Enforce join order flag. + */ + public boolean isEnforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * @return Replicated caches only flag. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + + /** + * @return Lazy query execution flag. + */ + public boolean isLazy() { + return lazy; + } + + /** + * @return Skip reducer on update flag, + */ + public boolean isSkipReducerOnUpdate() { + return skipReducerOnUpdate; + } + + /** + * @return Streaming state flag (on or off). + */ + public boolean isStream() { + return streamers != null; + } + + /** + * @param cacheName Cache name. + * @return Streamer for given cache. + */ + public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) { + if (streamers == null) + return null; + + IgniteDataStreamer<?, ?> res = streamers.get(cacheName); + + if (res != null) + return res; + + res = ctx.grid().dataStreamer(cacheName); + + res.autoFlushFrequency(streamFlushTimeout); + + res.allowOverwrite(streamAllowOverwrite); + + if (streamNodeBufSize > 0) + res.perNodeBufferSize(streamNodeBufSize); + + if (streamNodeParOps > 0) + res.perNodeParallelOperations(streamNodeParOps); + + streamers.put(cacheName, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + if (streamers == null) + return; + + for (IgniteDataStreamer<?, ?> s : streamers.values()) + U.close(s, log); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java index 9a2a865..26b2263 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java @@ -17,11 +17,10 @@ package org.apache.ignite.internal.sql; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.typedef.F; - import java.lang.reflect.Field; import java.util.HashSet; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.typedef.F; /** * SQL keyword constants. @@ -30,9 +29,15 @@ public class SqlKeyword { /** Keyword: ALTER. */ public static final String ALTER = "ALTER"; + /** Keyword: ALLOW_OVERWRITE */ + public static final String ALLOW_OVERWRITE = "ALLOW_OVERWRITE"; + /** Keyword: ASC. */ public static final String ASC = "ASC"; + /** Keyword: BATCH_SIZE */ + public static final String BATCH_SIZE = "BATCH_SIZE"; + /** Keyword: BIGINT */ public static final String BIGINT = "BIGINT"; @@ -96,6 +101,9 @@ public class SqlKeyword { /** Keyword: FLOAT8. */ public static final String FLOAT8 = "FLOAT8"; + /** Keyword: FLUSH_FREQUENCY. */ + public static final String FLUSH_FREQUENCY = "FLUSH_FREQUENCY"; + /** Keyword: FORMAT. */ public static final String FORMAT = "FORMAT"; @@ -168,9 +176,18 @@ public class SqlKeyword { /** Keyword: NVARCHAR2. */ public static final String NVARCHAR2 = "NVARCHAR2"; + /** Keyword: OFF. */ + public static final String OFF = "OFF"; + /** Keyword: ON. */ public static final String ON = "ON"; + /** Keyword: PER_NODE_PARALLEL_OPERATIONS. */ + public static final String PER_NODE_PARALLEL_OPERATIONS = "PER_NODE_PARALLEL_OPERATIONS"; + + /** Keyword: PER_NODE_BUFFER_SIZE. */ + public static final String PER_NODE_BUFFER_SIZE = "PER_NODE_BUFFER_SIZE"; + /** Keyword: PRECISION. */ public static final String PRECISION = "PRECISION"; @@ -183,6 +200,9 @@ public class SqlKeyword { /** Keyword: RESTRICT. */ public static final String RESTRICT = "RESTRICT"; + /** Keyword: SET. */ + public static final String SET = "SET"; + /** Keyword: SIGNED. */ public static final String SIGNED = "SIGNED"; @@ -195,6 +215,9 @@ public class SqlKeyword { /** Keyword: SPATIAL. */ public static final String SPATIAL = "SPATIAL"; + /** Keyword: STREAMING. */ + public static final String STREAMING = "STREAMING"; + /** Keyword: TABLE. */ public static final String TABLE = "TABLE"; http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java index 0627def..da6d28e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; +import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.sql.SqlKeyword.ALTER; @@ -31,7 +32,9 @@ import static org.apache.ignite.internal.sql.SqlKeyword.DROP; import static org.apache.ignite.internal.sql.SqlKeyword.HASH; import static org.apache.ignite.internal.sql.SqlKeyword.INDEX; import static org.apache.ignite.internal.sql.SqlKeyword.PRIMARY; +import static org.apache.ignite.internal.sql.SqlKeyword.SET; import static org.apache.ignite.internal.sql.SqlKeyword.SPATIAL; +import static org.apache.ignite.internal.sql.SqlKeyword.STREAMING; import static org.apache.ignite.internal.sql.SqlKeyword.TABLE; import static org.apache.ignite.internal.sql.SqlKeyword.UNIQUE; import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken; @@ -110,6 +113,11 @@ public class SqlParser { break; + case SET: + cmd = processSet(); + + break; + case ALTER: cmd = processAlter(); } @@ -122,7 +130,7 @@ public class SqlParser { return cmd; } else - throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER); + throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY, SET); case QUOTED: case MINUS: @@ -137,6 +145,22 @@ public class SqlParser { } /** + * Process SET keyword. + * + * @return Command. + */ + private SqlCommand processSet() { + if (lex.shift() && lex.tokenType() == SqlLexerTokenType.DEFAULT) { + switch (lex.token()) { + case STREAMING: + return new SqlSetStreamingCommand().parse(lex); + } + } + + throw errorUnexpectedToken(lex, STREAMING); + } + + /** * Processes COPY command. * * @return The {@link SqlBulkLoadCommand} command. http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java index 829c48c..9ed75ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java @@ -123,6 +123,28 @@ public class SqlParserUtils { } /** + * Parse boolean parameter value based on presence of tokens 1, 0, ON, OFF. Not that this is not + * and is not intended to be routine for parsing a boolean literal from TRUE/FALSE. + * @param lex Lexer. + * @return Boolean parameter value. + */ + public static boolean parseBoolean(SqlLexer lex) { + if (lex.shift() && lex.tokenType() == SqlLexerTokenType.DEFAULT) { + switch (lex.token()) { + case SqlKeyword.ON: + case "1": + return true; + + case SqlKeyword.OFF: + case "0": + return false; + } + } + + throw errorUnexpectedToken(lex, SqlKeyword.ON, SqlKeyword.OFF, "1", "0"); + } + + /** * Process name. * * @param lex Lexer. http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java new file mode 100644 index 0000000..c492c61 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.command; + +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.internal.sql.SqlKeyword; +import org.apache.ignite.internal.sql.SqlLexer; +import org.apache.ignite.internal.sql.SqlLexerTokenType; +import org.apache.ignite.internal.sql.SqlParseException; + +import static org.apache.ignite.internal.sql.SqlParserUtils.error; +import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken; +import static org.apache.ignite.internal.sql.SqlParserUtils.parseBoolean; +import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt; + +/** + * SET STREAMING command. + */ +public class SqlSetStreamingCommand implements SqlCommand { + /** Default batch size for driver. */ + private final static int DFLT_STREAM_BATCH_SIZE = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE * 4; + + /** Whether streaming must be turned on or off by this command. */ + private boolean turnOn; + + /** Whether existing values should be overwritten on keys duplication. */ + private boolean allowOverwrite; + + /** Batch size for driver. */ + private int batchSize = DFLT_STREAM_BATCH_SIZE; + + /** Per node number of parallel operations. */ + private int perNodeParOps; + + /** Per node buffer size. */ + private int perNodeBufSize; + + /** Streamer flush timeout. */ + private long flushFreq; + + /** {@inheritDoc} */ + @Override public SqlCommand parse(SqlLexer lex) { + turnOn = parseBoolean(lex); + + while (lex.lookAhead().tokenType() == SqlLexerTokenType.DEFAULT) { + switch (lex.lookAhead().token()) { + case SqlKeyword.BATCH_SIZE: + lex.shift(); + + checkOffLast(lex); + + batchSize = parseInt(lex); + + if (batchSize <= 0) + throw error(lex, "Invalid batch size (must be positive)."); + + break; + + case SqlKeyword.PER_NODE_BUFFER_SIZE: + lex.shift(); + + checkOffLast(lex); + + perNodeBufSize = parseInt(lex); + + if (perNodeBufSize <= 0) + throw error(lex, "Invalid per node buffer size (must be positive)."); + + break; + + case SqlKeyword.PER_NODE_PARALLEL_OPERATIONS: + lex.shift(); + + checkOffLast(lex); + + perNodeParOps = parseInt(lex); + + if (perNodeParOps <= 0) + throw error(lex, "Invalid per node parallel operations number (must be positive)."); + + break; + + case SqlKeyword.ALLOW_OVERWRITE: + lex.shift(); + + checkOffLast(lex); + + allowOverwrite = parseBoolean(lex); + + break; + + case SqlKeyword.FLUSH_FREQUENCY: + lex.shift(); + + checkOffLast(lex); + + flushFreq = parseInt(lex); + + if (flushFreq <= 0) + throw error(lex, "Invalid flush frequency (must be positive)."); + + break; + + default: + return this; + } + } + + return this; + } + + /** + * Throw an unexpected token exception if this command turns streaming off. + * @param lex Lexer to take unexpected token from. + * @throws SqlParseException if {@link #turnOn} is {@code false}. + */ + private void checkOffLast(SqlLexer lex) throws SqlParseException { + if (!turnOn) { + assert lex.tokenType() == SqlLexerTokenType.DEFAULT; + + throw errorUnexpectedToken(lex); + } + } + + /** + * @return Whether streaming must be turned on or off by this command. + */ + public boolean isTurnOn() { + return turnOn; + } + + /** + * @return Whether existing values should be overwritten on keys duplication. + */ + public boolean allowOverwrite() { + return allowOverwrite; + } + + /** + * @return Batch size for driver. + */ + public int batchSize() { + return batchSize; + } + + /** + * @return Per node number of parallel operations. + */ + public int perNodeParallelOperations() { + return perNodeParOps; + } + + /** + * @return Per node streamer buffer size. + */ + public int perNodeBufferSize() { + return perNodeBufSize; + } + + /** + * @return Streamer flush timeout + */ + public long flushFrequency() { + return flushFreq; + } + + /** {@inheritDoc} */ + @Override public String schemaName() { + return null; + } + + /** {@inheritDoc} */ + @Override public void schemaName(String schemaName) { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index df27c5f..6d7e9ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; @@ -244,12 +245,18 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { return null; } /** {@inheritDoc} */ - @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params, + @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, + SqlClientContext cliCtx) throws IgniteCheckedException { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException { return 0; } @@ -372,8 +379,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - return false; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java new file mode 100644 index 0000000..65bb599 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql; + +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; + +/** + * Tests for SQL parser: SET STREAMING. + */ +public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { + /** + * + */ + public void testParseSetStreaming() { + parseValidate("set streaming on", true, false, 2048, 0, 0, 0); + parseValidate("set streaming 1", true, false, 2048, 0, 0, 0); + parseValidate("set streaming off", false, false, 2048, 0, 0, 0); + parseValidate("set streaming 0", false, false, 2048, 0, 0, 0); + parseValidate("set streaming on batch_size 100", true, false, 100, 0, 0, 0); + parseValidate("set streaming on flush_frequency 500", true, false, 2048, 0, 0, 500); + parseValidate("set streaming on per_node_buffer_size 100", true, false, 2048, 0, 100, 0); + parseValidate("set streaming on per_node_parallel_operations 4", true, false, 2048, 4, 0, 0); + parseValidate("set streaming on allow_overwrite on", true, true, 2048, 0, 0, 0); + parseValidate("set streaming on allow_overwrite off", true, false, 2048, 0, 0, 0); + parseValidate("set streaming on per_node_buffer_size 50 flush_frequency 500 " + + "per_node_parallel_operations 4 allow_overwrite on batch_size 100", true, true, 100, 4, 50, 500); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set", + "Failed to parse SQL statement \"set[*]\": Unexpected end of command (expected: \"STREAMING\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming", + "Failed to parse SQL statement \"set streaming[*]\": Unexpected end of command (expected: " + + "\"ON\", \"OFF\", \"1\", \"0\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming table", + "Failed to parse SQL statement \"set streaming [*]table\": Unexpected token: \"TABLE\" (expected: " + + "\"ON\", \"OFF\", \"1\", \"0\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming -1", + "Failed to parse SQL statement \"set streaming [*]-1\": Unexpected token: \"-\" (expected: " + + "\"ON\", \"OFF\", \"1\", \"0\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming 500", + "Failed to parse SQL statement \"set streaming [*]500\": Unexpected token: \"500\" (expected: " + + "\"ON\", \"OFF\", \"1\", \"0\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on allow_overwrite", + "set streaming on allow_overwrite[*]\": Unexpected end of command (expected: \"ON\", \"OFF\", \"1\", " + + "\"0\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming 1 batch_size", + "Failed to parse SQL statement \"set streaming 1 batch_size[*]\": Unexpected end of command " + + "(expected: \"[integer]\")"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on per_node_parallel_operations -4", + "Failed to parse SQL statement \"set streaming on per_node_parallel_operations -[*]4\": " + + "Invalid per node parallel operations number (must be positive)"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on per_node_buffer_size -4", + "Failed to parse SQL statement \"set streaming on per_node_buffer_size -[*]4\": " + + "Invalid per node buffer size (must be positive)"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on flush_frequency -4", + "Failed to parse SQL statement \"set streaming on flush_frequency -[*]4\": " + + "Invalid flush frequency (must be positive)"); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off allow_overwrite", + "Failed to parse SQL statement \"set streaming off [*]allow_overwrite\": Unexpected token: " + + "\"ALLOW_OVERWRITE\""); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off batch_size", + "Failed to parse SQL statement \"set streaming off [*]batch_size\": Unexpected token: " + + "\"BATCH_SIZE\""); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off flush_frequency", + "Failed to parse SQL statement \"set streaming off [*]flush_frequency\": Unexpected token: " + + "\"FLUSH_FREQUENCY\""); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off per_node_buffer_size", + "Failed to parse SQL statement \"set streaming off [*]per_node_buffer_size\": Unexpected token: " + + "\"PER_NODE_BUFFER_SIZE\""); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off per_node_parallel_operations", + "Failed to parse SQL statement \"set streaming off [*]per_node_parallel_operations\": Unexpected token: " + + "\"PER_NODE_PARALLEL_OPERATIONS\""); + + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off table", + "Failed to parse SQL statement \"set streaming off [*]table\": Unexpected token: \"TABLE\""); + } + + /** + * Parse and validate SQL script. + * + * @param sql SQL. + * @param expOn Expected on/off value. + * @param expAllowOverwrite Expected allow overwrite flag. + * @param expBatchSize Expected batch size. + * @param expParOps Expected per-node parallael operations. + * @param expBufSize Expected per node buffer size. + * @param expFlushFreq Expected flush frequency. + */ + private static void parseValidate(String sql, boolean expOn, boolean expAllowOverwrite, int expBatchSize, + int expParOps, int expBufSize, long expFlushFreq) { + SqlSetStreamingCommand cmd = (SqlSetStreamingCommand)new SqlParser(QueryUtils.DFLT_SCHEMA, sql).nextCommand(); + + assertEquals(expOn, cmd.isTurnOn()); + + assertEquals(expAllowOverwrite, cmd.allowOverwrite()); + + assertEquals(expBatchSize, cmd.batchSize()); + + assertEquals(expParOps, cmd.perNodeParallelOperations()); + + assertEquals(expBufSize, cmd.perNodeBufferSize()); + + assertEquals(expFlushFreq, cmd.flushFrequency()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index b114828..8d2a820 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -299,7 +299,7 @@ public final class GridTestUtils { call.call(); } catch (Throwable e) { - if (cls != e.getClass()) { + if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) { if (e.getClass() == CacheException.class && e.getCause() != null && e.getCause().getClass() == cls) e = e.getCause(); else { http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index ea6c7c1..62dbd50 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -43,10 +43,10 @@ import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; -import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; import org.apache.ignite.internal.processors.bulkload.BulkLoadParser; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -385,6 +385,7 @@ public class DmlStatementsProcessor { /** * Perform given statement against given data streamer. Only rows based INSERT is supported. * + * @param schemaName Schema name. * @param streamer Streamer to feed data to. * @param stmt Statement. * @param args Statement arguments. @@ -392,81 +393,74 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) + long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) throws IgniteCheckedException { + idx.checkStatementStreamable(stmt); + Prepared p = GridSqlQueryParser.prepared(stmt); assert p != null; - final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); - - if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) - throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + - " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - - if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) { - assert plan.isLocalSubquery(); + final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null); - final GridCacheContext cctx = plan.cacheContext(); + assert plan.isLocalSubquery(); - QueryCursorImpl<List<?>> cur; + final GridCacheContext cctx = plan.cacheContext(); - final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); + QueryCursorImpl<List<?>> cur; - QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - try { - Iterator<List<?>> it; + final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - if (!F.isEmpty(plan.selectQuery())) { - GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), - plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), - null, false, 0, null); + QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + try { + Iterator<List<?>> it; - it = res.iterator(); - } - else - it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + if (!F.isEmpty(plan.selectQuery())) { + GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), + plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, false, 0, null); - return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + it = res.iterator(); } - } - }, null); - - data.addAll(stepCur.getAll()); + else + it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); - cur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - return data.iterator(); + return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); } - }, null); - - if (plan.rowCount() == 1) { - IgniteBiTuple t = plan.processRow(cur.iterator().next()); + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, null); - streamer.addData(t.getKey(), t.getValue()); + data.addAll(stepCur.getAll()); - return 1; + cur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return data.iterator(); } + }, null); - Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cur.iterator().next()); - for (List<?> row : cur) { - final IgniteBiTuple t = plan.processRow(row); + streamer.addData(t.getKey(), t.getValue()); - rows.put(t.getKey(), t.getValue()); - } + return 1; + } + + Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); - streamer.addData(rows); + for (List<?> row : cur) { + final IgniteBiTuple t = plan.processRow(row); - return rows.size(); + rows.put(t.getKey(), t.getValue()); } - else - throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + streamer.addData(rows); + + return rows.size(); } /** @@ -519,7 +513,7 @@ public class DmlStatementsProcessor { .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, true, true, + cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true, cancel).get(0); } else if (plan.hasRows()) @@ -610,7 +604,7 @@ public class DmlStatementsProcessor { * @return Update plan. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, + UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException { H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc); http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index cfbb7bb..e9d9f90 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -17,8 +17,16 @@ package org.apache.ignite.internal.processors.query.h2; +import java.lang.reflect.Constructor; +import java.sql.Connection; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; @@ -33,12 +41,7 @@ import org.h2.table.IndexColumn; import org.h2.value.DataType; import org.h2.value.Value; -import java.lang.reflect.Constructor; -import java.sql.Connection; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; +import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META; /** * H2 utility methods. @@ -267,4 +270,17 @@ public class H2Utils { private H2Utils() { // No-op. } + + /** + * @return Single-column, single-row cursor with 0 as number of updated records. + */ + @SuppressWarnings("unchecked") + public static QueryCursorImpl<List<?>> zeroCursor() { + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(0L)), null, false); + + resCur.fieldsMeta(UPDATE_RESULT_META); + + return resCur; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 361623c..65f08b2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO; @@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory; @@ -120,11 +122,12 @@ import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisito import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.sql.SqlParseException; import org.apache.ignite.internal.sql.SqlParser; -import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlAlterTableCommand; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; +import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -149,7 +152,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; -import org.h2.command.dml.Insert; import org.h2.command.dml.NoOperation; import org.h2.engine.Session; import org.h2.engine.SysProperties; @@ -191,7 +193,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"}) public class IgniteH2Indexing implements GridQueryIndexing { public static final Pattern INTERNAL_CMD_RE = Pattern.compile( - "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE); + "^(create|drop)\\s+index|^alter\\s+table|^copy|^set", Pattern.CASE_INSENSITIVE); /* * Register IO for indexes. @@ -500,10 +502,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException { + @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) { Connection conn = connectionForSchema(schemaName); - return prepareStatement(conn, sql, true); + return prepareStatementAndCaches(conn, sql); } /** @@ -1013,7 +1015,55 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException(e); } - return dmlProc.streamUpdateQuery(streamer, stmt, params); + return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, + SqlClientContext cliCtx) throws IgniteCheckedException { + if (cliCtx == null || !cliCtx.isStream()) { + U.warn(log, "Connection is not in streaming mode."); + + return zeroBatchedStreamedUpdateResult(params.size()); + } + + final Connection conn = connectionForSchema(schemaName); + + final PreparedStatement stmt = prepareStatementAndCaches(conn, qry); + + if (GridSqlQueryParser.checkMultipleStatements(stmt)) + throw new IgniteSQLException("Multiple statements queries are not supported for streaming mode.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + checkStatementStreamable(stmt); + + Prepared p = GridSqlQueryParser.prepared(stmt); + + UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null); + + IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name()); + + assert streamer != null; + + List<Long> res = new ArrayList<>(params.size()); + + for (int i = 0; i < params.size(); i++) + res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i))); + + return res; + } + + /** + * @param size Result size. + * @return List of given size filled with 0Ls. + */ + private static List<Long> zeroBatchedStreamedUpdateResult(int size) { + Long[] res = new Long[size]; + + Arrays.fill(res, 0); + + return Arrays.asList(res); } /** @@ -1399,7 +1449,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); final QueryCursor<List<?>> res = - querySqlFields(schemaName, fqry, keepBinary, true, null).get(0); + querySqlFields(schemaName, fqry, null, keepBinary, true, null).get(0); final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() { @Override public Iterator<Cache.Entry<K, V>> iterator() { @@ -1435,19 +1485,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Try executing query using native facilities. * * @param schemaName Schema name. - * @param qry Query. + * @param sql Query. + * @param cliCtx Client context, or {@code null} if not applicable. * @return Result or {@code null} if cannot parse/process this query. */ - private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) { + private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, String sql, + @Nullable SqlClientContext cliCtx) { // Heuristic check for fast return. - if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find()) + if (!INTERNAL_CMD_RE.matcher(sql.trim()).find()) return null; // Parse. SqlCommand cmd; try { - SqlParser parser = new SqlParser(schemaName, qry.getSql()); + SqlParser parser = new SqlParser(schemaName, sql); cmd = parser.nextCommand(); @@ -1455,15 +1507,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (parser.nextCommand() != null) return null; - // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE + // Currently supported commands are: + // CREATE/DROP INDEX + // COPY + // ALTER TABLE + // SET STREAMING if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || - cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand)) + cmd instanceof SqlAlterTableCommand || cmd instanceof SqlBulkLoadCommand || + cmd instanceof SqlSetStreamingCommand)) return null; } catch (Exception e) { // Cannot parse, return. if (log.isDebugEnabled()) - log.debug("Failed to parse SQL with native parser [qry=" + qry.getSql() + ", err=" + e + ']'); + log.debug("Failed to parse SQL with native parser [qry=" + sql + ", err=" + e + ']'); if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK)) return null; @@ -1473,24 +1530,40 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (e instanceof SqlParseException) code = ((SqlParseException)e).code(); - throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(), + throw new IgniteSQLException("Failed to parse DDL statement: " + sql + ": " + e.getMessage(), code, e); } // Execute. if (cmd instanceof SqlBulkLoadCommand) { - FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd); + FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(sql, cmd); return Collections.singletonList(cursor); } + else if (cmd instanceof SqlSetStreamingCommand) { + if (cliCtx == null) + throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver."); + + SqlSetStreamingCommand setCmd = (SqlSetStreamingCommand)cmd; + + boolean on = setCmd.isTurnOn(); + + if (on) + cliCtx.enableStreaming(setCmd.allowOverwrite(), setCmd.flushFrequency(), + setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations()); + else + cliCtx.disableStreaming(); + + return Collections.singletonList(H2Utils.zeroCursor()); + } else { try { - FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd); + FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(sql, cmd); return Collections.singletonList(cursor); } catch (IgniteCheckedException e) { - throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: " + throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sql + "]: " + e.getMessage(), e); } } @@ -1512,10 +1585,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ - @SuppressWarnings("StringEquality") + @SuppressWarnings({"StringEquality", "unchecked"}) @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { - List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry); + @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry.getSql(), cliCtx); if (res != null) return res; @@ -1553,8 +1626,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { // We may use this cached statement only for local queries and non queries. if (qry.isLocal() || !prepared.isQuery()) - return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, null, - keepBinary, cancel); + return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, + null, keepBinary, cancel); } } @@ -1602,12 +1675,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param twoStepQry Two-step query if this query must be executed in a distributed way. * @param meta Metadata for {@code twoStepQry}. * @param keepBinary Whether binary objects must not be deserialized automatically. - * @param cancel Query cancel state holder. - * @return Query result. + * @param cancel Query cancel state holder. @return Query result. */ private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared, - SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary, - GridQueryCancel cancel) { + SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, + List<GridQueryFieldMetadata> meta, boolean keepBinary, GridQueryCancel cancel) { String sqlQry = qry.getSql(); boolean loc = qry.isLocal(); @@ -2276,10 +2348,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - Prepared prep = GridSqlQueryParser.prepared(nativeStmt); - - return prep instanceof Insert; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + if (!GridSqlQueryParser.isStreamableInsertStatement(nativeStmt)) + throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index 6f5b51f..d4f2b0e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryEntityEx; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn; @@ -106,7 +107,7 @@ public class DdlStatementsProcessor { * @throws IgniteCheckedException On error. */ @SuppressWarnings("unchecked") - public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException{ + public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException { IgniteInternalFuture fut; try { @@ -211,12 +212,7 @@ public class DdlStatementsProcessor { if (fut != null) fut.get(); - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(0L)), null, false); - - resCur.fieldsMeta(UPDATE_RESULT_META); - - return resCur; + return H2Utils.zeroCursor(); } catch (SchemaOperationException e) { throw convert(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 10d485a..98fbb97 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -505,7 +505,7 @@ public final class UpdatePlan { /** * @return Local subquery flag. */ - @Nullable public boolean isLocalSubquery() { + public boolean isLocalSubquery() { return isLocSubqry; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index bced836..d897ac7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -85,20 +85,21 @@ public final class UpdatePlanBuilder { * @param loc Local query flag. * @param idx Indexing. * @param conn Connection. - * @param fieldsQuery Original query. + * @param fieldsQry Original query. * @return Update plan. */ public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx, - @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos) + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos) throws IgniteCheckedException { - assert !prepared.isQuery(); - GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared); if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert) - return planForInsert(stmt, loc, idx, conn, fieldsQuery); + return planForInsert(stmt, loc, idx, conn, fieldsQry); + else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete) + return planForUpdate(stmt, loc, idx, conn, fieldsQry, errKeysPos); else - return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos); + throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(), + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 04bc212..2d2c25c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -1993,6 +1993,18 @@ public class GridSqlQueryParser { } /** + * Check if passed statement is insert statement eligible for streaming. + * + * @param nativeStmt Native statement. + * @return {@code True} if streamable insert. + */ + public static boolean isStreamableInsertStatement(PreparedStatement nativeStmt) { + Prepared prep = prepared(nativeStmt); + + return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == null; + } + + /** * @param cond Condition. * @param o Object. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 069bdd7..cf8bb2e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -140,7 +140,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " + "from FactPurchase f, \"replicated-prod\".DimProduct p where p.id = f.productId "); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); set1.add((Integer)o.get(0)); @@ -154,7 +154,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select productId from FactPurchase group by productId"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(set0.add((Integer) o.get(0))); @@ -173,7 +173,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "where p.id = f.productId " + "group by f.productId, p.name"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(names.add((String)o.get(0))); @@ -190,7 +190,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "group by f.productId, p.name " + "having s >= 15"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(i(o, 1) >= 15); @@ -203,7 +203,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select top 3 distinct productId " + "from FactPurchase f order by productId desc "); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -216,7 +216,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { qry = new SqlFieldsQuery("select distinct productId " + "from FactPurchase f order by productId desc limit 2 offset 1"); - for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -256,13 +256,13 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, true); + qryProc.querySqlFields(cache.context(), qry, null, false, true); return null; } }, IgniteSQLException.class, "Multiple statements queries are not supported"); - List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, false, false); + List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, null, false, false); assertEquals(2, cursors.size()); @@ -274,7 +274,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, false); + qryProc.querySqlFields(cache.context(), qry, null, false, false); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index f004453..c01eaa6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -170,6 +170,7 @@ import org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValida import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest; import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest; import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest; +import org.apache.ignite.internal.sql.SqlParserSetStreamingSelfTest; import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest; import org.apache.ignite.testframework.IgniteTestSuite; @@ -187,6 +188,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(SqlParserCreateIndexSelfTest.class); suite.addTestSuite(SqlParserDropIndexSelfTest.class); suite.addTestSuite(SqlParserBulkLoadSelfTest.class); + suite.addTestSuite(SqlParserSetStreamingSelfTest.class); suite.addTestSuite(SqlConnectorConfigurationValidationSelfTest.class); suite.addTestSuite(ClientConnectorConfigurationValidationSelfTest.class);