This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1619dd5  IGNITE-11326: SQL: Common parsing routine for all command 
types. This closes #6112.
1619dd5 is described below

commit 1619dd5603f9b9eb71dbffc70add54df0a290e2c
Author: devozerov <[email protected]>
AuthorDate: Fri Feb 15 17:05:42 2019 +0300

    IGNITE-11326: SQL: Common parsing routine for all command types. This 
closes #6112.
---
 .../dht/colocated/GridDhtColocatedCache.java       |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   4 +-
 .../internal/processors/cache/mvcc/MvccUtils.java  |   6 +-
 .../cache/query/GridCacheQueryAdapter.java         |   2 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |   8 +-
 .../processors/odbc/odbc/OdbcRequestHandler.java   |   7 +-
 .../processors/query/GridQueryProcessor.java       |   2 +
 .../processors/query/h2/ConnectionManager.java     |  27 +-
 .../processors/query/h2/H2CachedStatementKey.java  |  13 +-
 .../internal/processors/query/h2/H2Utils.java      |   5 +-
 .../processors/query/h2/IgniteH2Indexing.java      | 531 ++++++-----------
 .../processors/query/h2/PreparedStatementEx.java   |  48 --
 .../query/h2/PreparedStatementExImpl.java          | 648 ---------------------
 .../internal/processors/query/h2/QueryParser.java  | 320 ++++++----
 .../processors/query/h2/QueryParserResultDml.java  |  24 +-
 .../query/h2/QueryParserResultSelect.java          |  76 ++-
 .../processors/query/h2/dml/UpdatePlanBuilder.java | 100 ++--
 .../query/h2/sql/GridSqlQueryParser.java           |  18 +-
 .../processors/query/RunningQueriesTest.java       |  50 --
 .../query/h2/H2StatementCacheSelfTest.java         |  86 ---
 .../query/h2/PreparedStatementExSelfTest.java      |  64 --
 .../IgniteBinaryCacheQueryTestSuite.java           |   4 -
 22 files changed, 600 insertions(+), 1447 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 281669e..1c60aae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -237,7 +237,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         if (ctx.mvccEnabled()) {
             try {
                 if (tx != null)
-                    mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+                    mvccSnapshot = MvccUtils.requestSnapshot(tx);
                 else {
                     mvccTracker = MvccUtils.mvccTracker(ctx, null);
 
@@ -342,7 +342,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         if (ctx.mvccEnabled()) {
             try {
                 if (tx != null)
-                    mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+                    mvccSnapshot = MvccUtils.requestSnapshot(tx);
                 else {
                     mvccTracker = MvccUtils.mvccTracker(ctx, null);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index d6fcf50..db673e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -718,7 +718,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements GridTimeou
         @Nullable final CacheEntryPredicate filter
     ) {
         try {
-            MvccUtils.requestSnapshot(cacheCtx, this);
+            MvccUtils.requestSnapshot(this);
 
             beforePut(cacheCtx, retval, true);
         }
@@ -1898,7 +1898,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
         @Nullable final CacheEntryPredicate filter
     ) {
         try {
-            MvccUtils.requestSnapshot(cacheCtx, this);
+            MvccUtils.requestSnapshot(this);
 
             beforeRemove(cacheCtx, retval, true);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 225af81..cf7b62b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -817,7 +817,7 @@ public class MvccUtils {
         if (tx == null)
             tracker = new MvccQueryTrackerImpl(cctx);
         else
-            tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, 
tx));
+            tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(tx));
 
         if (tracker.snapshot() == null)
             // TODO IGNITE-7388
@@ -827,13 +827,11 @@ public class MvccUtils {
     }
 
     /**
-     * @param cctx Cache context.
      * @param tx Transaction.
      * @throws IgniteCheckedException If failed.
      * @return Mvcc snapshot.
      */
-    public static MvccSnapshot requestSnapshot(GridCacheContext cctx,
-        @NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+    public static MvccSnapshot requestSnapshot(@NotNull GridNearTxLocal tx) 
throws IgniteCheckedException {
         MvccSnapshot snapshot = tx.mvccSnapshot();
 
         if (snapshot == null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 9fc02d7..f69dd0c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -594,7 +594,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
             GridNearTxLocal tx = cctx.tm().userTx();
 
             if (tx != null)
-                mvccSnapshot = MvccUtils.requestSnapshot(cctx, tx);
+                mvccSnapshot = MvccUtils.requestSnapshot(tx);
             else {
                 mvccTracker = MvccUtils.mvccTracker(cctx, null);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 7a28a19..c2db8e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -906,8 +906,12 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         IgniteBiTuple<Integer, String> firstErr, GridQueryCancel cancel) 
throws QueryCancelledException {
         try {
             if (cliCtx.isStream()) {
-                List<Long> cnt = 
ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
-                    qry.batchedArguments());
+                List<Long> cnt = ctx.query().streamBatchedUpdateQuery(
+                    qry.getSchema(),
+                    cliCtx,
+                    qry.getSql(),
+                    qry.batchedArguments()
+                );
 
                 for (int i = 0; i < cnt.size(); i++)
                     updCntsAcc.add(cnt.get(i).intValue());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 43fedae..abc8085 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -558,7 +558,12 @@ public class OdbcRequestHandler implements 
ClientListenerRequestHandler {
         try {
             assert cliCtx.isStream();
 
-            ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, 
qry.getSql(), qry.batchedArguments());
+            ctx.query().streamBatchedUpdateQuery(
+                qry.getSchema(),
+                cliCtx,
+                qry.getSql(),
+                qry.batchedArguments()
+            );
         }
         catch (Exception e) {
             U.error(log, "Failed to execute batch query [qry=" + qry +']', e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e099a1c..95f5e3b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2321,6 +2321,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      */
     public List<Long> streamBatchedUpdateQuery(final String schemaName, final 
SqlClientContext cliCtx,
         final String qry, final List<Object[]> args) {
+        checkxEnabled();
+
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
index 8c1e89c..18a842b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -302,14 +302,25 @@ public class ConnectionManager {
 
         PreparedStatement stmt = cache.get(key);
 
-        if (stmt != null && !stmt.isClosed() && 
!stmt.unwrap(JdbcStatement.class).isCancelled() &&
-            !GridSqlQueryParser.prepared(stmt).needRecompile()) {
-            assert stmt.getConnection() == c;
+        // Nothing found.
+        if (stmt == null)
+            return null;
+
+        // TODO: Remove thread local caching at all. Just keep per-connection 
statement cache.
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-11211
+        // Statement is not from the given connection.
+        if (stmt.getConnection() != c)
+            return null;
+
+        // Is statement still valid?
+        if (
+            stmt.isClosed() ||                                 // Closed.
+            stmt.unwrap(JdbcStatement.class).isCancelled() ||  // Cancelled.
+            GridSqlQueryParser.prepared(stmt).needRecompile() // Outdated 
(schema has been changed concurrently).
+        )
+            return null;
 
-            return stmt;
-        }
-
-        return null;
+        return stmt;
     }
 
     /**
@@ -328,7 +339,7 @@ public class ConnectionManager {
 
             H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(), 
sql);
 
-            stmt = PreparedStatementExImpl.wrap(prepareStatementNoCache(c, 
sql));
+            stmt = prepareStatementNoCache(c, sql);
 
             cache.put(key, stmt);
         }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
index 300ed6c..360c2ad 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
@@ -41,8 +41,8 @@ class H2CachedStatementKey {
      * @param schemaName Schema name.
      * @param sql SQL.
      */
-    H2CachedStatementKey(String schemaName, String sql) {
-        this(schemaName, sql, null, false);
+    public H2CachedStatementKey(String schemaName, String sql) {
+        this(schemaName, sql, null);
     }
 
     /**
@@ -51,19 +51,20 @@ class H2CachedStatementKey {
      * @param schemaName Schema name.
      * @param sql SQL.
      * @param fieldsQry Query with flags.
-     * @param loc DML {@code SELECT} Locality flag.
      */
-    public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery 
fieldsQry, boolean loc) {
+    public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery 
fieldsQry) {
         this.schemaName = schemaName;
         this.sql = sql;
 
-        if (fieldsQry == null || loc || 
!UpdatePlanBuilder.isSkipReducerOnUpdateQuery(fieldsQry))
+        if (fieldsQry == null)
             this.flags = 0; // flags only relevant for server side updates.
         else {
             this.flags = (byte)(1 +
                 (fieldsQry.isDistributedJoins() ? 2 : 0) +
                 (fieldsQry.isEnforceJoinOrder() ? 4 : 0) +
-                (fieldsQry.isCollocated() ? 8 : 0));
+                (fieldsQry.isCollocated() ? 8 : 0) +
+                (fieldsQry.isLocal() ? 8 : 0)
+            );
         }
     }
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index b86a481..f45e574 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -102,13 +102,16 @@ import javax.cache.CacheException;
 
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
-import static 
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.prepared;
 
 /**
  * H2 utility methods.
  */
 public class H2Utils {
+    /** Dummy metadata for update result. */
+    public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META =
+        Collections.singletonList(new H2SqlFieldMetadata(null, null, 
"UPDATED", Long.class.getName(), -1, -1));
+
     /** Spatial index class name. */
     private static final String SPATIAL_IDX_CLS =
         
"org.apache.ignite.internal.processors.query.h2.opt.GridH2SpatialIndex";
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7f24399..12e8f9e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,7 +77,6 @@ import 
org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import 
org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
 import org.apache.ignite.internal.processors.query.EnlistOperation;
 import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -116,11 +115,8 @@ import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
@@ -161,7 +157,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
-import org.h2.command.Prepared;
 import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.table.IndexColumn;
@@ -175,8 +170,12 @@ import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.request
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
-import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
-import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
+import static 
org.apache.ignite.internal.processors.query.h2.H2Utils.UPDATE_RESULT_META;
+import static 
org.apache.ignite.internal.processors.query.h2.H2Utils.bindParameters;
+import static 
org.apache.ignite.internal.processors.query.h2.H2Utils.generateFieldsQueryString;
+import static org.apache.ignite.internal.processors.query.h2.H2Utils.session;
+import static 
org.apache.ignite.internal.processors.query.h2.H2Utils.validateTypeDescriptor;
+import static 
org.apache.ignite.internal.processors.query.h2.H2Utils.zeroCursor;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
 
 /**
@@ -199,10 +198,6 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
         H2ExtrasLeafIO.register();
     }
 
-    /** Dummy metadata for update result. */
-    public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META =
-        Collections.singletonList(new H2SqlFieldMetadata(null, null, 
"UPDATED", Long.class.getName(), -1, -1));
-
     /** Default number of attempts to re-run DELETE and UPDATE queries in case 
of concurrent modifications of values. */
     private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
 
@@ -242,9 +237,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     protected volatile GridKernalContext ctx;
 
-    /** Cache object value context. */
-    protected CacheQueryObjectValueContext valCtx;
-
     /** Query context registry. */
     private final QueryContextRegistry qryCtxRegistry = new 
QueryContextRegistry();
 
@@ -288,22 +280,6 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
         return ctx;
     }
 
-    /**
-     * @param c Connection.
-     * @param sql SQL.
-     * @return <b>Cached</b> prepared statement.
-     */
-    @SuppressWarnings("ConstantConditions")
-    @Nullable private PreparedStatement cachedStatement(Connection c, String 
sql) {
-        try {
-            return connMgr.cachedPreparedStatement(c, sql);
-        }
-        catch (SQLException e) {
-            // We actually don't except anything SQL related here as we're 
supposed to work with cache only.
-            throw new AssertionError(e);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public PreparedStatement prepareNativeStatement(String 
schemaName, String sql) {
         Connection conn = connMgr.connectionForThread().connection(schemaName);
@@ -490,63 +466,64 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         assert mvccEnabled || mvccTracker == null;
 
         try {
-            final Connection conn = 
connMgr.connectionForThread().connection(schemaName);
-
-            H2Utils.setupConnection(conn, false, enforceJoinOrder);
+            SqlFieldsQuery fieldsQry = new SqlFieldsQuery(qry)
+                .setLocal(true)
+                .setEnforceJoinOrder(enforceJoinOrder)
+                .setDataPageScanEnabled(dataPageScanEnabled)
+                .setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
 
-            PreparedStatement stmt = preparedStatementWithParams(conn, qry, 
params, true);
+            if (params != null)
+                fieldsQry.setArgs(params.toArray());
 
-            if (GridSqlQueryParser.checkMultipleStatements(stmt))
-                throw new IgniteSQLException("Multiple statements queries are 
not supported for local queries");
+            QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, 
false);
 
-            Prepared p = GridSqlQueryParser.prepared(stmt);
-
-            if (GridSqlQueryParser.isDml(p)) {
-                QueryParserResultDml dml = parser.prepareDmlStatement(p);
-
-                SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
-
-                if (params != null)
-                    fldsQry.setArgs(params.toArray());
-
-                fldsQry.setEnforceJoinOrder(enforceJoinOrder);
-                fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
-                fldsQry.setDataPageScanEnabled(dataPageScanEnabled);
-
-                UpdateResult updRes = executeUpdate(schemaName, conn, dml, 
fldsQry, true, filter, cancel);
+            if (parseRes.isDml()) {
+                UpdateResult updRes = executeUpdate(schemaName, 
parseRes.dml(), fieldsQry, true, filter, cancel);
 
                 List<?> updResRow = 
Collections.singletonList(updRes.counter());
 
                 return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, 
new IgniteSingletonIterator<>(updResRow));
             }
-            else if (CommandProcessor.isCommand(p)) {
+            else if (parseRes.isCommand()) {
                 throw new IgniteSQLException("DDL statements are supported for 
the whole cluster only.",
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
             }
 
+            assert parseRes.isSelect();
+
+            QueryParserResultSelect select = parseRes.select();
+
+            assert select != null;
+
             MvccSnapshot mvccSnapshot = null;
 
-            boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(p);
+            boolean forUpdate = select.forUpdate();
 
             if (forUpdate && !mvccEnabled)
                 throw new IgniteSQLException("SELECT FOR UPDATE query requires 
transactional " +
                     "cache with MVCC enabled.", 
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-            if (ctx.security().enabled()) {
-                GridSqlQueryParser parser = new GridSqlQueryParser(false);
-
-                parser.parse(p);
-
-                checkSecurity(parser.cacheIds());
-            }
+            if (ctx.security().enabled())
+                checkSecurity(select.cacheIds());
 
             GridNearTxSelectForUpdateFuture sfuFut = null;
 
             int opTimeout = qryTimeout;
 
             if (mvccEnabled) {
-                if (mvccTracker == null)
-                    mvccTracker = mvccTracker(stmt, startTx);
+                if (mvccTracker == null) {
+                    Integer mvccCacheId = select.mvccCacheId();
+
+                    if (mvccCacheId != null) {
+                        GridCacheContext mvccCacheCtx = 
ctx.cache().context().cacheContext(select.mvccCacheId());
+
+                        if (mvccCacheCtx == null)
+                            throw new IgniteCheckedException("Cache has been 
stopped concurrently [cacheId=" +
+                                mvccCacheId + ']');
+
+                        mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx, 
startTx);
+                    }
+                }
 
                 if (mvccTracker != null) {
                     mvccSnapshot = mvccTracker.snapshot();
@@ -561,11 +538,7 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
                         throw new IgniteSQLException("SELECT FOR UPDATE query 
requires transactional " +
                             "cache with MVCC enabled.", 
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-                    GridSqlStatement stmt0 = new 
GridSqlQueryParser(false).parse(p);
-
-                    qry = 
GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(stmt0, forUpdate = tx != null);
-
-                    stmt = preparedStatementWithParams(conn, qry, params, 
true);
+                    qry = 
GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(select.statement(), forUpdate 
= tx != null);
 
                     if (forUpdate) {
                         GridCacheContext cctx = mvccTracker.context();
@@ -588,7 +561,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             GridNearTxLocal tx0 = tx;
             MvccQueryTracker mvccTracker0 = mvccTracker;
             GridNearTxSelectForUpdateFuture sfuFut0 = sfuFut;
-            PreparedStatement stmt0 = stmt;
             String qry0 = qry;
             int timeout0 = opTimeout;
 
@@ -606,10 +578,27 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
                     qryCtxRegistry.setThreadLocal(qctx);
 
-                    ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable 
detachedConn = connMgr.detachThreadConnection();
+                    ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn = 
connMgr.detachThreadConnection();
 
                     try {
-                        ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, 
qry0, params, timeout0, cancel, dataPageScanEnabled);
+                        Connection conn0 = 
conn.object().connection(schemaName);
+
+                        PreparedStatement stmt = preparedStatementWithParams(
+                            conn0,
+                            qry0,
+                            params,
+                            true
+                        );
+
+                        ResultSet rs = executeSqlQueryWithTimer(
+                            stmt,
+                            conn0,
+                            qry0,
+                            params,
+                            timeout0,
+                            cancel,
+                            dataPageScanEnabled
+                        );
 
                         if (sfuFut0 != null) {
                             assert tx0.mvccSnapshot() != null;
@@ -662,10 +651,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                             }
                         }
 
-                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 
!= null, detachedConn);
+                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 
!= null, conn);
                     }
                     catch (IgniteCheckedException | RuntimeException | Error 
e) {
-                        detachedConn.recycle();
+                        conn.recycle();
 
                         try {
                             if (mvccTracker0 != null)
@@ -707,56 +696,46 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public long streamUpdateQuery(String schemaName, String qry,
-        @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws 
IgniteCheckedException {
-        final Connection conn = 
connMgr.connectionForThread().connection(schemaName);
-
-        final PreparedStatement stmt;
-
-        try {
-            stmt = connMgr.prepareStatement(conn, qry);
-        }
-        catch (SQLException e) {
-            throw new IgniteSQLException(e);
-        }
+    @Override public long streamUpdateQuery(
+        String schemaName,
+        String qry,
+        @Nullable Object[] params,
+        IgniteDataStreamer<?, ?> streamer
+    ) throws IgniteCheckedException {
+        QueryParserResultDml dml = streamerParse(schemaName, qry);
 
-        return streamQuery0(qry, schemaName, streamer, stmt, params);
+        return streamQuery0(qry, schemaName, streamer, dml, params);
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public List<Long> streamBatchedUpdateQuery(String schemaName, 
String qry, List<Object[]> params,
-        SqlClientContext cliCtx) throws IgniteCheckedException {
+    @SuppressWarnings({"ForLoopReplaceableByForEach", "ConstantConditions"})
+    @Override public List<Long> streamBatchedUpdateQuery(
+        String schemaName,
+        String qry,
+        List<Object[]> params,
+        SqlClientContext cliCtx
+    ) throws IgniteCheckedException {
         if (cliCtx == null || !cliCtx.isStream()) {
             U.warn(log, "Connection is not in streaming mode.");
 
             return zeroBatchedStreamedUpdateResult(params.size());
         }
 
-        final Connection conn = 
connMgr.connectionForThread().connection(schemaName);
-
-        final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
-
-        if (GridSqlQueryParser.checkMultipleStatements(stmt))
-            throw new IgniteSQLException("Multiple statements queries are not 
supported for streaming mode.",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        checkStatementStreamable(stmt);
-
-        QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+        QueryParserResultDml dml = streamerParse(schemaName, qry);
 
-        UpdatePlan plan = updatePlan(schemaName, conn, dml, null, true);
-
-        IgniteDataStreamer<?, ?> streamer = 
cliCtx.streamerForCache(plan.cacheContext().name());
+        IgniteDataStreamer<?, ?> streamer = 
cliCtx.streamerForCache(dml.streamTable().cacheName());
 
         assert streamer != null;
 
-        List<Long> res = new ArrayList<>(params.size());
+        List<Long> ress = new ArrayList<>(params.size());
 
-        for (int i = 0; i < params.size(); i++)
-            res.add(streamQuery0(qry, schemaName, streamer, stmt, 
params.get(i)));
+        for (int i = 0; i < params.size(); i++) {
+            long res = streamQuery0(qry, schemaName, streamer, dml, 
params.get(i));
 
-        return res;
+            ress.add(res);
+        }
+
+        return ress;
     }
 
     /**
@@ -765,77 +744,51 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param qry Query.
      * @param schemaName Schema name.
      * @param streamer Streamer to feed data to.
-     * @param stmt Statement.
+     * @param dml DML statement.
      * @param args Statement arguments.
      * @return Number of rows in given INSERT statement.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
-    private long streamQuery0(String qry, String schemaName, 
IgniteDataStreamer streamer, PreparedStatement stmt,
+    private long streamQuery0(String qry, String schemaName, 
IgniteDataStreamer streamer, QueryParserResultDml dml,
         final Object[] args) throws IgniteCheckedException {
         Long qryId = runningQryMgr.register(qry, 
GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
 
         boolean fail = false;
 
         try {
-            checkStatementStreamable(stmt);
+            UpdatePlan plan = updatePlan(schemaName, dml, null);
 
-            QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+            List<List<?>> planRows = plan.createRows(args != null ? args : 
X.EMPTY_OBJECT_ARRAY);
 
-            final UpdatePlan plan = updatePlan(schemaName, null, dml, null, 
true);
-
-            assert plan.isLocalSubquery();
-
-            final GridCacheContext cctx = plan.cacheContext();
-
-            QueryCursorImpl<List<?>> cur;
-
-            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
-
-            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new 
Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        assert F.isEmpty(plan.selectQuery());
-
-                        Object[] params = args != null ? args : 
X.EMPTY_OBJECT_ARRAY;
-
-                        Iterator<List<?>> it = 
plan.createRows(params).iterator();
-
-                        return new GridQueryCacheObjectsIterator(it, 
objectContext(), cctx.keepBinary());
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }, null);
-
-            data.addAll(stepCur.getAll());
-
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    return data.iterator();
-                }
-            }, null);
+            Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(
+                planRows.iterator(),
+                objectContext(),
+                true
+            );
 
-            if (plan.rowCount() == 1) {
-                IgniteBiTuple t = plan.processRow(cur.iterator().next());
+            if (planRows.size() == 1) {
+                IgniteBiTuple t = plan.processRow(iter.next());
 
                 streamer.addData(t.getKey(), t.getValue());
 
                 return 1;
             }
+            else {
+                Map<Object, Object> rows = new 
LinkedHashMap<>(plan.rowCount());
 
-            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+                while (iter.hasNext()) {
+                    List<?> row = iter.next();
 
-            for (List<?> row : cur) {
-                final IgniteBiTuple t = plan.processRow(row);
+                    IgniteBiTuple t = plan.processRow(row);
 
-                rows.put(t.getKey(), t.getValue());
-            }
+                    rows.put(t.getKey(), t.getValue());
+                }
 
-            streamer.addData(rows);
+                streamer.addData(rows);
 
-            return rows.size();
+                return rows.size();
+            }
         }
         catch (IgniteCheckedException e) {
             fail = true;
@@ -848,6 +801,26 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
     }
 
     /**
+     * Parse statement for streamer.
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @return DML.
+     */
+    private QueryParserResultDml streamerParse(String schemaName, String qry) {
+        QueryParserResult parseRes = parser.parse(schemaName, new 
SqlFieldsQuery(qry), false);
+
+        QueryParserResultDml dml = parseRes.dml();
+
+        if (dml == null || !dml.streamable()) {
+            throw new IgniteSQLException("Streaming mode supports only INSERT 
commands without subqueries.",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        }
+
+        return dml;
+    }
+
+    /**
      * @param size Result size.
      * @return List of given size filled with 0Ls.
      */
@@ -880,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteCheckedException("Failed to parse SQL query: " + 
sql, e);
         }
 
-        H2Utils.bindParameters(stmt, params);
+        bindParameters(stmt, params);
 
         return stmt;
     }
@@ -915,7 +888,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             });
         }
 
-        Session ses = H2Utils.session(conn);
+        Session ses = session(conn);
 
         if (timeoutMillis > 0)
             ses.setQueryTimeout(timeoutMillis);
@@ -1102,99 +1075,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * Initialises MVCC filter and returns MVCC query tracker if needed.
-     * @param stmt Prepared statement.
-     * @param startTx Start transaction flag.
-     * @return MVCC query tracker or {@code null} if MVCC is disabled for 
involved caches.
-     */
-    private MvccQueryTracker mvccTracker(PreparedStatement stmt, boolean 
startTx) throws IgniteCheckedException {
-        boolean mvccEnabled;
-
-        GridCacheContext mvccCacheCtx = null;
-
-        try {
-            if (stmt.isWrapperFor(PreparedStatementEx.class)) {
-                PreparedStatementEx stmtEx = 
stmt.unwrap(PreparedStatementEx.class);
-
-                Boolean mvccState = stmtEx.meta(MVCC_STATE);
-
-                mvccEnabled = mvccState != null ? mvccState : checkMvcc(stmt);
-
-                if (mvccEnabled) {
-                    Integer cacheId = stmtEx.meta(MVCC_CACHE_ID);
-
-                    assert cacheId != null;
-
-                    mvccCacheCtx = ctx.cache().context().cacheContext(cacheId);
-
-                    assert mvccCacheCtx != null;
-                }
-            }
-            else
-                mvccEnabled = checkMvcc(stmt);
-        }
-        catch (SQLException e) {
-            throw new IgniteSQLException(e);
-        }
-
-        assert !mvccEnabled || mvccCacheCtx != null;
-
-        return mvccEnabled ? MvccUtils.mvccTracker(mvccCacheCtx, startTx) : 
null;
-    }
-
-    /**
-     * Checks if statement uses MVCC caches. If it does, additional metadata 
is added to statement.
-     *
-     * @param stmt Statement to check.
-     * @return {@code True} if there MVCC cache involved in statement.
-     * @throws SQLException If parser failed.
-     */
-    private static Boolean checkMvcc(PreparedStatement stmt) throws 
SQLException {
-        GridSqlQueryParser parser = new GridSqlQueryParser(false);
-
-        parser.parse(GridSqlQueryParser.prepared(stmt));
-
-        Boolean mvccEnabled = null;
-        Integer mvccCacheId = null;
-        GridCacheContext ctx0 = null;
-
-        for (Object o : parser.objectsMap().values()) {
-            if (o instanceof GridSqlAlias)
-                o = GridSqlAlias.unwrap((GridSqlAst) o);
-            if (o instanceof GridSqlTable && ((GridSqlTable) o).dataTable() != 
null) {
-                GridCacheContext cctx = 
((GridSqlTable)o).dataTable().cacheContext();
-
-                assert cctx != null;
-
-                if (mvccEnabled == null) {
-                    mvccEnabled = cctx.mvccEnabled();
-                    mvccCacheId = cctx.cacheId();
-                    ctx0 = cctx;
-                }
-                else if (mvccEnabled != cctx.mvccEnabled())
-                    
MvccUtils.throwAtomicityModesMismatchException(ctx0.config(), cctx.config());
-            }
-        }
-
-        if (mvccEnabled == null)
-            return false;
-
-        // Remember mvccEnabled flag to avoid further additional parsing if 
statement obtained from the statement cache.
-        if (stmt.isWrapperFor(PreparedStatementEx.class)) {
-            PreparedStatementEx stmtEx = 
stmt.unwrap(PreparedStatementEx.class);
-
-            if (mvccEnabled) {
-                stmtEx.putMeta(MVCC_CACHE_ID, mvccCacheId);
-                stmtEx.putMeta(MVCC_STATE, Boolean.TRUE);
-            }
-            else
-                stmtEx.putMeta(MVCC_STATE, FALSE);
-        }
-
-        return mvccEnabled;
-    }
-
-    /**
      * @param schemaName Schema name.
      * @param qry Query.
      * @param keepCacheObj Flag to keep cache object.
@@ -1273,27 +1153,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
     }
 
-    /**
-     * Run DML on remote nodes.
-     *
-     * @param schemaName Schema name.
-     * @param fieldsQry Initial update query.
-     * @param cacheIds Cache identifiers.
-     * @param isReplicatedOnly Whether query uses only replicated caches.
-     * @param cancel Cancel state.
-     * @return Update result.
-     */
-    UpdateResult runDistributedUpdate(
-        String schemaName,
-        SqlFieldsQuery fieldsQry,
-        List<Integer> cacheIds,
-        boolean isReplicatedOnly,
-        GridQueryCancel cancel) {
-        return rdcQryExec.update(schemaName, cacheIds, fieldsQry.getSql(), 
fieldsQry.getArgs(),
-            fieldsQry.isEnforceJoinOrder(), fieldsQry.getPageSize(), 
fieldsQry.getTimeout(),
-            fieldsQry.getPartitions(), isReplicatedOnly, cancel);
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings("deprecation")
     @Override public SqlFieldsQuery generateFieldsQuery(String cacheName, 
SqlQuery qry) {
@@ -1310,7 +1169,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         String sql;
 
         try {
-            sql = H2Utils.generateFieldsQueryString(qry.getSql(), 
qry.getAlias(), tblDesc);
+            sql = generateFieldsQueryString(qry.getSql(), qry.getAlias(), 
tblDesc);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -1343,14 +1202,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param cmd Command (native).
      * @return Result.
      */
-    public FieldsQueryCursor<List<?>> executeCommand(
+    private FieldsQueryCursor<List<?>> executeCommand(
         String schemaName,
         SqlFieldsQuery qry,
         @Nullable SqlClientContext cliCtx,
         QueryParserResultCommand cmd
     ) {
         if (cmd.noOp())
-            return H2Utils.zeroCursor();
+            return zeroCursor();
 
         SqlCommand cmdNative = cmd.commandNative();
         GridSqlStatement cmdH2 = cmd.commandH2();
@@ -1391,9 +1250,13 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     private void checkClusterState(QueryParserResult parseRes) {
         if (!ctx.state().publicApiActiveState(true)) {
             if (parseRes.isCommand()) {
-                SqlCommand cmd = parseRes.command().commandNative();
+                QueryParserResultCommand cmd = parseRes.command();
+
+                assert cmd != null;
 
-                if (cmd instanceof SqlCommitTransactionCommand || cmd 
instanceof SqlRollbackTransactionCommand)
+                SqlCommand cmd0 = cmd.commandNative();
+
+                if (cmd0 instanceof SqlCommitTransactionCommand || cmd0 
instanceof SqlRollbackTransactionCommand)
                     return;
             }
 
@@ -1416,13 +1279,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             SqlFieldsQuery remainingQry = qry;
 
             while (remainingQry != null) {
-                QueryParserResult parseRes = parser.parse(schemaName, 
remainingQry);
+                QueryParserResult parseRes = parser.parse(schemaName, 
remainingQry, !failOnMultipleStmts);
 
                 remainingQry = parseRes.remainingQuery();
 
-                if (remainingQry != null && failOnMultipleStmts)
-                    throw new IgniteSQLException("Multiple statements queries 
are not supported");
-
                 SqlFieldsQuery newQry = parseRes.query();
 
                 assert newQry.getSql() != null;
@@ -1510,12 +1370,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             boolean fail = false;
 
             try {
-                Connection conn = 
connMgr.connectionForThread().connection(schemaName);
-
                 if (!loc)
-                    return executeUpdateDistributed(schemaName, conn, dml , 
qry, cancel);
+                    return executeUpdateDistributed(schemaName, dml , qry, 
cancel);
                 else {
-                    UpdateResult updRes = executeUpdate(schemaName, conn, dml 
, qry, true, filter, cancel);
+                    UpdateResult updRes = executeUpdate(schemaName, dml , qry, 
true, filter, cancel);
 
                     return Collections.singletonList(new QueryCursorImpl<>(new 
Iterable<List<?>>() {
                         @SuppressWarnings("NullableProblems")
@@ -1543,6 +1401,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             // Distributed query.
             GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
 
+            assert twoStepQry != null;
+
             if (ctx.security().enabled())
                 checkSecurity(twoStepQry.cacheIds());
 
@@ -1624,7 +1484,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @return {@code True} if need to start transaction.
      */
     @SuppressWarnings("SimplifiableIfStatement")
-    public boolean autoStartTx(SqlFieldsQuery qry) {
+    private boolean autoStartTx(SqlFieldsQuery qry) {
         if (!mvccEnabled(ctx))
             return false;
 
@@ -1672,18 +1532,17 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             loc = false;
         }
 
-        Connection conn = connMgr.connectionForThread().connection(schema);
+        QueryParserResult parseRes = parser.parse(schema, fldsQry, false);
 
-        H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+        assert parseRes.remainingQuery() == null;
 
-        PreparedStatement stmt = preparedStatementWithParams(conn, 
fldsQry.getSql(),
-            F.asList(fldsQry.getArgs()), true);
+        QueryParserResultDml dml = parseRes.dml();
 
-        IndexingQueryFilter filter = backupFilter(topVer, parts);
+        assert dml != null;
 
-        QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+        IndexingQueryFilter filter = backupFilter(topVer, parts);
 
-        UpdatePlan plan = updatePlan(schema, conn, dml, fldsQry, loc);
+        UpdatePlan plan = updatePlan(schema, dml, fldsQry);
 
         GridCacheContext planCctx = plan.cacheContext();
 
@@ -1904,22 +1763,15 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         GridQueryCancel cancel,
         boolean loc
     ) throws IgniteCheckedException {
-        Connection conn = connMgr.connectionForThread().connection(schemaName);
-
-        H2Utils.setupConnection(conn, false, qry.isEnforceJoinOrder());
+        QueryParserResult parseRes = parser.parse(schemaName, qry, false);
 
-        PreparedStatement stmt = preparedStatementWithParams(conn, 
qry.getSql(), Arrays.asList(qry.getArgs()), true);
+        assert parseRes.remainingQuery() == null;
 
-        Connection c;
+        QueryParserResultDml dml = parseRes.dml();
 
-        try {
-            c = stmt.getConnection();
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException(e);
-        }
+        assert dml != null;
 
-        return executeUpdate(schemaName, c, parser.prepareDmlStatement(stmt), 
qry, loc, filter, cancel);
+        return executeUpdate(schemaName, dml, qry, loc, filter, cancel);
     }
 
     /**
@@ -1934,7 +1786,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      */
     @Override public boolean registerType(GridCacheContextInfo cacheInfo, 
GridQueryTypeDescriptor type, boolean isSql)
         throws IgniteCheckedException {
-        H2Utils.validateTypeDescriptor(type);
+        validateTypeDescriptor(type);
         schemaMgr.onCacheTypeCreated(cacheInfo, this, type, isSql);
 
         return true;
@@ -2116,8 +1968,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         schemaMgr = new SchemaManager(ctx, connMgr);
         schemaMgr.start(ctx.config().getSqlSchemas());
 
-        valCtx = new CacheQueryObjectValueContext(ctx);
-
         nodeId = ctx.localNodeId();
         marshaller = ctx.config().getMarshaller();
 
@@ -2459,7 +2309,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /**
      * @param schemaName Schema.
-     * @param conn Connection.
      * @param dml DML statement.
      * @param fieldsQry Initial query
      * @param cancel Query cancel.
@@ -2469,7 +2318,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     @SuppressWarnings("unchecked")
     private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
         String schemaName,
-        Connection conn,
         QueryParserResultDml dml,
         SqlFieldsQuery fieldsQry,
         GridQueryCancel cancel
@@ -2481,7 +2329,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
             List<Object[]> argss = fieldsQry0.batchedArguments();
 
-            UpdatePlan plan = updatePlan(schemaName, conn, dml, fieldsQry0, 
false);
+            UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry0);
 
             GridCacheContext<?, ?> cctx = plan.cacheContext();
 
@@ -2517,7 +2365,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     UpdateResult res;
 
                     try {
-                        res = executeUpdate(schemaName, conn, dml, qry0, 
false, null, cancel);
+                        res = executeUpdate(schemaName, dml, qry0, false, 
null, cancel);
 
                         cntPerRow[cntr++] = (int)res.counter();
 
@@ -2556,7 +2404,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             return resCurs;
         }
         else {
-            UpdateResult res = executeUpdate(schemaName, conn, dml, fieldsQry, 
false, null, cancel);
+            UpdateResult res = executeUpdate(schemaName, dml, fieldsQry, 
false, null, cancel);
 
             res.throwIfError();
 
@@ -2573,7 +2421,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * Execute DML statement, possibly with few re-attempts in case of 
concurrent data modifications.
      *
      * @param schemaName Schema.
-     * @param conn Connection.
      * @param dml DML command.
      * @param fieldsQry Original query.
      * @param loc Query locality flag.
@@ -2582,14 +2429,19 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @return Update result (modified items count and failed keys).
      * @throws IgniteCheckedException if failed.
      */
-    private UpdateResult executeUpdate(String schemaName, Connection conn, 
QueryParserResultDml dml,
-        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, 
GridQueryCancel cancel)
-        throws IgniteCheckedException {
+    private UpdateResult executeUpdate(
+        String schemaName,
+        QueryParserResultDml dml,
+        SqlFieldsQuery fieldsQry,
+        boolean loc,
+        IndexingQueryFilter filters,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
         Object[] errKeys = null;
 
         long items = 0;
 
-        UpdatePlan plan = updatePlan(schemaName, conn, dml, fieldsQry, loc);
+        UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry);
 
         GridCacheContext<?, ?> cctx = plan.cacheContext();
 
@@ -2645,12 +2497,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     ) throws IgniteCheckedException {
         GridCacheContext cctx = plan.cacheContext();
 
+        DmlDistributedPlanInfo distributedPlan = loc ? null : 
plan.distributedPlan();
+
         if (cctx != null && cctx.mvccEnabled()) {
             assert cctx.transactional();
 
-            DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
-            GridNearTxLocal tx = tx(cctx.kernalContext());
+            GridNearTxLocal tx = tx(ctx);
 
             boolean implicit = (tx == null);
 
@@ -2660,7 +2512,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             if (implicit)
                 tx = txStart(cctx, fieldsQry.getTimeout());
 
-            requestSnapshot(cctx, tx);
+            requestSnapshot(tx);
 
             try (GridNearTxLocal toCommit = commit ? tx : null) {
                 long timeout = implicit
@@ -2758,7 +2610,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 if (ex instanceof IgniteException)
                     throw (IgniteException)ex;
 
-                U.error(log, "Error during update [localNodeId=" + 
cctx.localNodeId() + "]", ex);
+                U.error(log, "Error during update [localNodeId=" + 
ctx.localNodeId() + "]", ex);
 
                 throw new IgniteSQLException("Failed to run update. " + 
ex.getMessage(), ex);
             }
@@ -2773,16 +2625,22 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         if (fastUpdateRes != null)
             return fastUpdateRes;
 
-        if (plan.distributedPlan() != null) {
-            DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
-            assert distributedPlan != null;
-
+        if (distributedPlan != null) {
             if (cancel == null)
                 cancel = new GridQueryCancel();
 
-            UpdateResult result = runDistributedUpdate(schemaName, fieldsQry, 
distributedPlan.getCacheIds(),
-                distributedPlan.isReplicatedOnly(), cancel);
+            UpdateResult result = rdcQryExec.update(
+                schemaName,
+                distributedPlan.getCacheIds(),
+                fieldsQry.getSql(),
+                fieldsQry.getArgs(),
+                fieldsQry.isEnforceJoinOrder(),
+                fieldsQry.getPageSize(),
+                fieldsQry.getTimeout(),
+                fieldsQry.getPartitions(),
+                distributedPlan.isReplicatedOnly(),
+                cancel
+            );
 
             // null is returned in case not all nodes support distributed DML.
             if (result != null)
@@ -2845,24 +2703,20 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * Generate SELECT statements to retrieve data for modifications from and 
find fast UPDATE or DELETE args,
      * if available.
      *
-     * @param schema Schema.
-     * @param conn Connection.
+     * @param schemaName Schema.
      * @param dml Command.
      * @param fieldsQry Original fields query.
-     * @param loc Local query flag.
      * @return Update plan.
      */
     @SuppressWarnings("IfMayBeConditional")
     private UpdatePlan updatePlan(
-        String schema,
-        Connection conn,
+        String schemaName,
         QueryParserResultDml dml,
-        SqlFieldsQuery fieldsQry,
-        boolean loc
+        SqlFieldsQuery fieldsQry
     ) throws IgniteCheckedException {
         // Disallow updates on SYSTEM schema.
-        if (F.eq(QueryUtils.SCHEMA_SYS, schema))
-            throw new IgniteSQLException("DML statements are not supported on 
" + schema + " schema",
+        if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
+            throw new IgniteSQLException("DML statements are not supported on 
" + schemaName + " schema",
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
         // Disallow updates inside transaction if this is not MVCC mode.
@@ -2875,7 +2729,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 "\"IGNITE_ALLOW_DML_INSIDE_TRANSACTION\"");
         }
 
-        H2CachedStatementKey planKey = new H2CachedStatementKey(schema, 
dml.statement().getSQL(), fieldsQry, loc);
+        H2CachedStatementKey planKey = new H2CachedStatementKey(schemaName, 
dml.statement().getSQL(), fieldsQry);
 
         UpdatePlan res = updatePlanCache.get(planKey);
 
@@ -2883,11 +2737,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             return res;
 
         res = UpdatePlanBuilder.planForStatement(
+            schemaName,
             dml.statement(),
             dml.mvccEnabled(),
-            loc,
             this,
-            conn,
             fieldsQry
         );
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
deleted file mode 100644
index 50dd892..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface PreparedStatementEx extends PreparedStatement {
-    /** */
-    static final AtomicInteger metaIdGenerator = new AtomicInteger();
-
-    /** Flag if at least one MVCC cache is used in this statement. */
-    static final int MVCC_STATE = metaIdGenerator.getAndIncrement();
-
-    /** First mvcc cache id of the involved caches. */
-    static final int MVCC_CACHE_ID = metaIdGenerator.getAndIncrement();
-
-    /**
-     * @param id Metadata key.
-     * @return Attached metadata.
-     */
-    @Nullable <T> T meta(int id);
-
-    /**
-     * @param id Metadata key.
-     * @param metaObj  Metadata object.
-     */
-    void putMeta(int id, Object metaObj);
-}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
deleted file mode 100644
index e98e537..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.io.InputStream;
-import java.io.Reader;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.NClob;
-import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
-import java.sql.Ref;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.RowId;
-import java.sql.SQLException;
-import java.sql.SQLType;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Calendar;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * PreparedStatement with extended capability to store additional meta 
information.
- */
-@SuppressWarnings("unchecked")
-final class PreparedStatementExImpl implements PreparedStatementEx {
-    /** */
-    private final PreparedStatement delegate;
-
-    /** */
-    private Object[] meta = null;
-
-    /**
-     * @param delegate Wrapped statement.
-     */
-    public PreparedStatementExImpl(PreparedStatement delegate) {
-        this.delegate = delegate;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ResultSet executeQuery() throws SQLException {
-        return delegate.executeQuery();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int executeUpdate() throws SQLException {
-        return delegate.executeUpdate();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNull(int parameterIndex, int sqlType) throws 
SQLException {
-        delegate.setNull(parameterIndex, sqlType);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBoolean(int parameterIndex, boolean x) throws 
SQLException {
-        delegate.setBoolean(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setByte(int parameterIndex, byte x) throws 
SQLException {
-        delegate.setByte(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setShort(int parameterIndex, short x) throws 
SQLException {
-        delegate.setShort(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setInt(int parameterIndex, int x) throws 
SQLException {
-        delegate.setInt(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setLong(int parameterIndex, long x) throws 
SQLException {
-        delegate.setLong(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setFloat(int parameterIndex, float x) throws 
SQLException {
-        delegate.setFloat(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDouble(int parameterIndex, double x) throws 
SQLException {
-        delegate.setDouble(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBigDecimal(int parameterIndex, BigDecimal x) 
throws SQLException {
-        delegate.setBigDecimal(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setString(int parameterIndex, String x) throws 
SQLException {
-        delegate.setString(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBytes(int parameterIndex, byte[] x) throws 
SQLException {
-        delegate.setBytes(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDate(int parameterIndex, Date x) throws 
SQLException {
-        delegate.setDate(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTime(int parameterIndex, Time x) throws 
SQLException {
-        delegate.setTime(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimestamp(int parameterIndex, Timestamp x) throws 
SQLException {
-        delegate.setTimestamp(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAsciiStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
-        delegate.setAsciiStream(parameterIndex, x, length);
-    }
-
-    /** {@inheritDoc} */
-    @Deprecated
-    @Override public void setUnicodeStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
-        delegate.setUnicodeStream(parameterIndex, x, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBinaryStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
-        delegate.setBinaryStream(parameterIndex, x, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clearParameters() throws SQLException {
-        delegate.clearParameters();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setObject(int parameterIndex, Object x, int 
targetSqlType) throws SQLException {
-        delegate.setObject(parameterIndex, x, targetSqlType);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setObject(int parameterIndex, Object x) throws 
SQLException {
-        delegate.setObject(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean execute() throws SQLException {
-        return delegate.execute();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addBatch() throws SQLException {
-        delegate.addBatch();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setCharacterStream(int parameterIndex, Reader 
reader, int length) throws SQLException {
-        delegate.setCharacterStream(parameterIndex, reader, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setRef(int parameterIndex, Ref x) throws 
SQLException {
-        delegate.setRef(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBlob(int parameterIndex, Blob x) throws 
SQLException {
-        delegate.setBlob(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setClob(int parameterIndex, Clob x) throws 
SQLException {
-        delegate.setClob(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setArray(int parameterIndex, Array x) throws 
SQLException {
-        delegate.setArray(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ResultSetMetaData getMetaData() throws SQLException {
-        return delegate.getMetaData();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDate(int parameterIndex, Date x, Calendar cal) 
throws SQLException {
-        delegate.setDate(parameterIndex, x, cal);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTime(int parameterIndex, Time x, Calendar cal) 
throws SQLException {
-        delegate.setTime(parameterIndex, x, cal);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimestamp(int parameterIndex, Timestamp x, 
Calendar cal) throws SQLException {
-        delegate.setTimestamp(parameterIndex, x, cal);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNull(int parameterIndex, int sqlType, String 
typeName) throws SQLException {
-        delegate.setNull(parameterIndex, sqlType, typeName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setURL(int parameterIndex, URL x) throws 
SQLException {
-        delegate.setURL(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ParameterMetaData getParameterMetaData() throws 
SQLException {
-        return delegate.getParameterMetaData();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setRowId(int parameterIndex, RowId x) throws 
SQLException {
-        delegate.setRowId(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNString(int parameterIndex, String value) throws 
SQLException {
-        delegate.setNString(parameterIndex, value);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNCharacterStream(int parameterIndex, Reader 
value, long length) throws SQLException {
-        delegate.setNCharacterStream(parameterIndex, value, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNClob(int parameterIndex, NClob value) throws 
SQLException {
-        delegate.setNClob(parameterIndex, value);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
-        delegate.setClob(parameterIndex, reader, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBlob(int parameterIndex, InputStream inputStream, 
long length) throws SQLException {
-        delegate.setBlob(parameterIndex, inputStream, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
-        delegate.setNClob(parameterIndex, reader, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject) 
throws SQLException {
-        delegate.setSQLXML(parameterIndex, xmlObject);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setObject(int parameterIndex, Object x, int 
targetSqlType, int scaleOrLength) throws SQLException {
-        delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAsciiStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
-        delegate.setAsciiStream(parameterIndex, x, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBinaryStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
-        delegate.setBinaryStream(parameterIndex, x, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setCharacterStream(int parameterIndex, Reader 
reader, long length) throws SQLException {
-        delegate.setCharacterStream(parameterIndex, reader, length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAsciiStream(int parameterIndex, InputStream x) 
throws SQLException {
-        delegate.setAsciiStream(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBinaryStream(int parameterIndex, InputStream x) 
throws SQLException {
-        delegate.setBinaryStream(parameterIndex, x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setCharacterStream(int parameterIndex, Reader 
reader) throws SQLException {
-        delegate.setCharacterStream(parameterIndex, reader);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNCharacterStream(int parameterIndex, Reader 
value) throws SQLException {
-        delegate.setNCharacterStream(parameterIndex, value);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setClob(int parameterIndex, Reader reader) throws 
SQLException {
-        delegate.setClob(parameterIndex, reader);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setBlob(int parameterIndex, InputStream inputStream) 
throws SQLException {
-        delegate.setBlob(parameterIndex, inputStream);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNClob(int parameterIndex, Reader reader) throws 
SQLException {
-        delegate.setNClob(parameterIndex, reader);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setObject(int parameterIndex, Object x, SQLType 
targetSqlType, int scaleOrLength) throws SQLException {
-        delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setObject(int parameterIndex, Object x, SQLType 
targetSqlType) throws SQLException {
-        delegate.setObject(parameterIndex, x, targetSqlType);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long executeLargeUpdate() throws SQLException {
-        return delegate.executeLargeUpdate();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ResultSet executeQuery(String sql) throws SQLException {
-        return delegate.executeQuery(sql);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int executeUpdate(String sql) throws SQLException {
-        return delegate.executeUpdate(sql);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws SQLException {
-        delegate.close();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getMaxFieldSize() throws SQLException {
-        return delegate.getMaxFieldSize();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setMaxFieldSize(int max) throws SQLException {
-        delegate.setMaxFieldSize(max);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getMaxRows() throws SQLException {
-        return delegate.getMaxRows();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setMaxRows(int max) throws SQLException {
-        delegate.setMaxRows(max);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setEscapeProcessing(boolean enable) throws 
SQLException {
-        delegate.setEscapeProcessing(enable);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getQueryTimeout() throws SQLException {
-        return delegate.getQueryTimeout();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setQueryTimeout(int seconds) throws SQLException {
-        delegate.setQueryTimeout(seconds);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() throws SQLException {
-        delegate.cancel();
-    }
-
-    /** {@inheritDoc} */
-    @Override public SQLWarning getWarnings() throws SQLException {
-        return delegate.getWarnings();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clearWarnings() throws SQLException {
-        delegate.clearWarnings();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setCursorName(String name) throws SQLException {
-        delegate.setCursorName(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean execute(String sql) throws SQLException {
-        return delegate.execute(sql);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ResultSet getResultSet() throws SQLException {
-        return delegate.getResultSet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getUpdateCount() throws SQLException {
-        return delegate.getUpdateCount();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean getMoreResults() throws SQLException {
-        return delegate.getMoreResults();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getFetchDirection() throws SQLException {
-        return delegate.getFetchDirection();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setFetchDirection(int direction) throws SQLException 
{
-        delegate.setFetchDirection(direction);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getFetchSize() throws SQLException {
-        return delegate.getFetchSize();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setFetchSize(int rows) throws SQLException {
-        delegate.setFetchSize(rows);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getResultSetConcurrency() throws SQLException {
-        return delegate.getResultSetConcurrency();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getResultSetType() throws SQLException {
-        return delegate.getResultSetType();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addBatch(String sql) throws SQLException {
-        delegate.addBatch(sql);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clearBatch() throws SQLException {
-        delegate.clearBatch();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int[] executeBatch() throws SQLException {
-        return delegate.executeBatch();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Connection getConnection() throws SQLException {
-        return delegate.getConnection();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean getMoreResults(int current) throws SQLException {
-        return delegate.getMoreResults(current);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ResultSet getGeneratedKeys() throws SQLException {
-        return delegate.getGeneratedKeys();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int executeUpdate(String sql, int autoGeneratedKeys) 
throws SQLException {
-        return delegate.executeUpdate(sql, autoGeneratedKeys);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int executeUpdate(String sql, int[] columnIndexes) throws 
SQLException {
-        return delegate.executeUpdate(sql, columnIndexes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int executeUpdate(String sql, String[] columnNames) 
throws SQLException {
-        return delegate.executeUpdate(sql, columnNames);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean execute(String sql, int autoGeneratedKeys) throws 
SQLException {
-        return delegate.execute(sql, autoGeneratedKeys);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean execute(String sql, int[] columnIndexes) throws 
SQLException {
-        return delegate.execute(sql, columnIndexes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean execute(String sql, String[] columnNames) throws 
SQLException {
-        return delegate.execute(sql, columnNames);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getResultSetHoldability() throws SQLException {
-        return delegate.getResultSetHoldability();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isClosed() throws SQLException {
-        return delegate.isClosed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isPoolable() throws SQLException {
-        return delegate.isPoolable();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setPoolable(boolean poolable) throws SQLException {
-        delegate.setPoolable(poolable);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void closeOnCompletion() throws SQLException {
-        delegate.closeOnCompletion();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isCloseOnCompletion() throws SQLException {
-        return delegate.isCloseOnCompletion();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLargeUpdateCount() throws SQLException {
-        return delegate.getLargeUpdateCount();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLargeMaxRows() throws SQLException {
-        return delegate.getLargeMaxRows();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setLargeMaxRows(long max) throws SQLException {
-        delegate.setLargeMaxRows(max);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long[] executeLargeBatch() throws SQLException {
-        return delegate.executeLargeBatch();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long executeLargeUpdate(String sql) throws SQLException {
-        return delegate.executeLargeUpdate(sql);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long executeLargeUpdate(String sql, int 
autoGeneratedKeys) throws SQLException {
-        return delegate.executeLargeUpdate(sql, autoGeneratedKeys);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long executeLargeUpdate(String sql, int[] columnIndexes) 
throws SQLException {
-        return delegate.executeLargeUpdate(sql, columnIndexes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long executeLargeUpdate(String sql, String[] columnNames) 
throws SQLException {
-        return delegate.executeLargeUpdate(sql, columnNames);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <T> T unwrap(Class<T> iface) throws SQLException {
-        if (iface == PreparedStatementExImpl.class || iface == 
PreparedStatementEx.class)
-            return (T)this;
-
-        return delegate.unwrap(iface);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
-        return iface == PreparedStatementExImpl.class
-            || iface == PreparedStatementEx.class
-            || delegate.isWrapperFor(iface);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public <T> T meta(int id) {
-        return meta != null && id < meta.length ? (T)meta[id] : null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void putMeta(int id, Object metaObj) {
-        if (meta == null)
-            meta = new Object[id + 1];
-        else if (meta.length <= id)
-            meta = Arrays.copyOf(meta, id + 1);
-
-        meta[id] = metaObj;
-    }
-
-    /**
-     *
-     * @param stmt Prepared statement to wrap.
-     * @return Wrapped statement.
-     */
-    public static PreparedStatement wrap(@NotNull PreparedStatement stmt) {
-        if (stmt.getClass() == PreparedStatementExImpl.class)
-            return stmt;
-
-        return new PreparedStatementExImpl(stmt);
-    }
-}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 13810cb..029a77c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -31,11 +30,16 @@ import 
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
 import org.apache.ignite.internal.sql.SqlStrictParseException;
@@ -62,6 +66,7 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 
 /**
@@ -72,7 +77,7 @@ public class QueryParser {
     private static final int CACHE_SIZE = 1024;
 
     /** A pattern for commands having internal implementation in Ignite. */
-    public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
+    private static final Pattern INTERNAL_CMD_RE = Pattern.compile(
         
"^(create|drop)\\s+index|^alter\\s+table|^copy|^set|^begin|^commit|^rollback|^(create|alter|drop)\\s+user",
         Pattern.CASE_INSENSITIVE);
 
@@ -107,10 +112,11 @@ public class QueryParser {
      *
      * @param schemaName schema name.
      * @param qry query to parse.
+     * @param remainingAllowed Whether multiple statements are allowed.        
    
      * @return Parsing result that contains Parsed leading query and remaining 
sql script.
      */
-    public QueryParserResult parse(String schemaName, SqlFieldsQuery qry) {
-        QueryParserResult res = parse0(schemaName, qry);
+    public QueryParserResult parse(String schemaName, SqlFieldsQuery qry, 
boolean remainingAllowed) {
+        QueryParserResult res = parse0(schemaName, qry, remainingAllowed);
 
         checkQueryType(qry, res.isSelect());
 
@@ -122,9 +128,10 @@ public class QueryParser {
      *
      * @param schemaName schema name.
      * @param qry query to parse.
+     * @param remainingAllowed Whether multiple statements are allowed.
      * @return Parsing result that contains Parsed leading query and remaining 
sql script.
      */
-    private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry) {
+    private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry, 
boolean remainingAllowed) {
         // First, let's check if we already have a two-step query for this 
statement...
         QueryParserCacheKey cachedKey = new QueryParserCacheKey(
             schemaName,
@@ -137,15 +144,15 @@ public class QueryParser {
 
         QueryParserCacheEntry cached = cache.get(cachedKey);
 
-        if (cached != null)
+        if (cached != null) 
             return new QueryParserResult(qry, null, cached.select(), 
cached.dml(), cached.command());
 
         // Try parting as native command.
-        QueryParserResult parseRes = parseNative(schemaName, qry);
+        QueryParserResult parseRes = parseNative(schemaName, qry, 
remainingAllowed);
 
         // Otherwise parse with H2.
         if (parseRes == null)
-            parseRes = parseH2(schemaName, qry);
+            parseRes = parseH2(schemaName, qry, remainingAllowed);
 
         // Add to cache if not multi-statement.
         if (parseRes.remainingQuery() == null) {
@@ -164,11 +171,12 @@ public class QueryParser {
      *
      * @param schemaName Schema name.
      * @param qry which sql text to parse.
+     * @param remainingAllowed Whether multiple statements are allowed.        
      
      * @return Command or {@code null} if cannot parse this query.
      */
     @SuppressWarnings("IfMayBeConditional")
     @Nullable
-    private QueryParserResult parseNative(String schemaName, SqlFieldsQuery 
qry) {
+    private QueryParserResult parseNative(String schemaName, SqlFieldsQuery 
qry, boolean remainingAllowed) {
         String sql = qry.getSql();
 
         // Heuristic check for fast return.
@@ -198,12 +206,13 @@ public class QueryParser {
 
             SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
 
-            SqlFieldsQuery remainingQry;
-
-            if (F.isEmpty(parser.remainingSql()))
-                remainingQry = null;
-            else
+            SqlFieldsQuery remainingQry = null;
+            
+            if (!F.isEmpty(parser.remainingSql())) {
+                checkRemainingAllowed(remainingAllowed);
+                
                 remainingQry = 
cloneFieldsQuery(qry).setSql(parser.remainingSql()).setArgs(qry.getArgs());
+            }
 
             QueryParserResultCommand cmd = new 
QueryParserResultCommand(nativeCmd, null, false);
 
@@ -234,10 +243,11 @@ public class QueryParser {
      *
      * @param schemaName Schema name.
      * @param qry Query.
+     * @param remainingAllowed Whether multiple statements are allowed.        
      
      * @return Parsing result.
      */
     @SuppressWarnings("IfMayBeConditional")
-    private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry) {
+    private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, 
boolean remainingAllowed) {
         Connection c = connMgr.connectionForThread().connection(schemaName);
 
         // For queries that are explicitly local, we rely on the flag 
specified in the query
@@ -258,151 +268,208 @@ public class QueryParser {
                 IgniteQueryErrorCode.PARSING, e);
         }
 
-        if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
-            throw new IgniteSQLException("Multiple statements queries are not 
supported for local queries.",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        GridSqlQueryParser.PreparedWithRemaining prep = 
GridSqlQueryParser.preparedWithRemaining(stmt);
-
-        Prepared prepared = prep.prepared();
-
-        if (GridSqlQueryParser.isExplainUpdate(prepared))
-            throw new IgniteSQLException("Explains of update queries are not 
supported.",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        try {
+            if (qry.isLocal() && 
GridSqlQueryParser.checkMultipleStatements(stmt))
+                throw new IgniteSQLException("Multiple statements queries are 
not supported for local queries.",
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-        int paramsCnt = prepared.getParameters().size();
+            GridSqlQueryParser.PreparedWithRemaining prep = 
GridSqlQueryParser.preparedWithRemaining(stmt);
 
-        Object[] argsOrig = qry.getArgs();
+            Prepared prepared = prep.prepared();
 
-        Object[] args = null;
-        Object[] remainingArgs = null;
+            if (GridSqlQueryParser.isExplainUpdate(prepared))
+                throw new IgniteSQLException("Explains of update queries are 
not supported.",
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-        if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
-            if (argsOrig == null || argsOrig.length < paramsCnt) {
-                throw new IgniteException("Invalid number of query parameters. 
" +
-                    "Cannot find " + (argsOrig != null ? argsOrig.length + 1 : 
1) + " parameter.");
+            // Get remaining query and check if it is allowed.
+            SqlFieldsQuery remainingQry = null;
+            
+            if (!F.isEmpty(prep.remainingSql())) {
+                checkRemainingAllowed(remainingAllowed);
+                
+                remainingQry = 
cloneFieldsQuery(qry).setSql(prep.remainingSql());
             }
 
-            args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
+            // Prepare new query.
+            SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(prepared.getSQL());
 
-            if (paramsCnt != argsOrig.length)
-                remainingArgs = Arrays.copyOfRange(argsOrig, paramsCnt, 
argsOrig.length);
-        }
-        else
-            remainingArgs = argsOrig;
+            int paramsCnt = prepared.getParameters().size();
 
-        SqlFieldsQuery remainingQry;
+            Object[] argsOrig = qry.getArgs();
 
-        if (F.isEmpty(prep.remainingSql()))
-            remainingQry = null;
-        else
-            remainingQry = 
cloneFieldsQuery(qry).setSql(prep.remainingSql()).setArgs(remainingArgs);
+            Object[] args = null;
+            Object[] remainingArgs = null;
 
-        SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
+            if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
+                if (argsOrig == null || argsOrig.length < paramsCnt)
+                    // Not enough parameters, but we will handle this later on 
execution phase.
+                    args = argsOrig;
+                else {
+                    args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
 
-        if (CommandProcessor.isCommand(prepared)) {
-            GridSqlStatement cmdH2 = new 
GridSqlQueryParser(false).parse(prepared);
+                    if (paramsCnt != argsOrig.length)
+                        remainingArgs = Arrays.copyOfRange(argsOrig, 
paramsCnt, argsOrig.length);
+                }
+            }
+            else
+                remainingArgs = argsOrig;
 
-            QueryParserResultCommand cmd = new QueryParserResultCommand(null, 
cmdH2, false);
+            newQry.setArgs(args);
 
-            return new QueryParserResult(newQry, remainingQry, null, null, 
cmd);
-        }
-        else if (CommandProcessor.isCommandNoOp(prepared)) {
-            QueryParserResultCommand cmd = new QueryParserResultCommand(null, 
null, true);
+            if (remainingQry != null)
+                remainingQry.setArgs(remainingArgs);
 
-            return new QueryParserResult(newQry, remainingQry, null, null, 
cmd);
-        }
-        else if (GridSqlQueryParser.isDml(prepared)) {
-            QueryParserResultDml dml = prepareDmlStatement(prepared);
-
-            return new QueryParserResult(newQry, remainingQry, null, dml, 
null);
-        }
-        else if (!prepared.isQuery()) {
-            throw new IgniteSQLException("Unsupported statement: " + 
newQry.getSql(),
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-        }
-
-        // Parse SELECT. Split is required either if query is distirubted, or 
when it is local, but executed
-        // over segmented PARTITIONED case. In this case multiple map queries 
will be executed against local
-        // node stripes in parallel and then merged through reduce process.
+            // Do actual parsing.
+            if (CommandProcessor.isCommand(prepared)) {
+                GridSqlStatement cmdH2 = new 
GridSqlQueryParser(false).parse(prepared);
 
+                QueryParserResultCommand cmd = new 
QueryParserResultCommand(null, cmdH2, false);
 
-        // Calculate if query is in fact can be executed locally.
-        boolean loc = qry.isLocal();
+                return new QueryParserResult(newQry, remainingQry, null, null, 
cmd);
+            }
+            else if (CommandProcessor.isCommandNoOp(prepared)) {
+                QueryParserResultCommand cmd = new 
QueryParserResultCommand(null, null, true);
 
-        GridSqlQueryParser parser = null;
+                return new QueryParserResult(newQry, remainingQry, null, null, 
cmd);
+            }
+            else if (GridSqlQueryParser.isDml(prepared)) {
+                QueryParserResultDml dml = prepareDmlStatement(prepared);
 
-        if (!loc) {
-            parser = new GridSqlQueryParser(false);
+                return new QueryParserResult(newQry, remainingQry, null, dml, 
null);
+            }
+            else if (!prepared.isQuery()) {
+                throw new IgniteSQLException("Unsupported statement: " + 
newQry.getSql(),
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            }
 
-            parser.parse(prepared);
+            // Parse SELECT.
+            GridSqlQueryParser parser = new GridSqlQueryParser(false);
 
-            if (parser.isLocalQuery())
-                loc = true;
-        }
+            GridSqlStatement stmt0 = parser.parse(prepared);
 
-        // If this is a local query, check if it must be split.
-        boolean locSplit = false;
+            List<Integer> cacheIds = parser.cacheIds();
+            Integer mvccCacheId = mvccCacheIdForSelect(parser.objectsMap());
 
-        if (loc) {
-            if (parser == null) {
-                parser = new GridSqlQueryParser(false);
+            // Calculate if query is in fact can be executed locally.
+            boolean loc = qry.isLocal();
 
-                parser.parse(prepared);
+            if (!loc) {
+                if (parser.isLocalQuery())
+                    loc = true;
             }
 
-            GridCacheContext cctx = parser.getFirstPartitionedCache();
-
-            if (cctx != null && cctx.config().getQueryParallelism() > 1)
-                locSplit = true;
-        }
+            // If this is a local query, check if it must be split.
+            boolean locSplit = false;
 
-        boolean splitNeeded = !loc || locSplit;
+            if (loc) {
+                GridCacheContext cctx = parser.getFirstPartitionedCache();
 
-        try {
-            GridCacheTwoStepQuery twoStepQry = null;
-
-            if (splitNeeded) {
-                twoStepQry = GridSqlQuerySplitter.split(
-                    
connMgr.connectionForThread().connection(newQry.getSchema()),
-                    prepared,
-                    newQry.getArgs(),
-                    newQry.isCollocated(),
-                    newQry.isDistributedJoins(),
-                    newQry.isEnforceJoinOrder(),
-                    locSplit,
-                    idx
-                );
+                if (cctx != null && cctx.config().getQueryParallelism() > 1)
+                    locSplit = true;
             }
 
-            List<GridQueryFieldMetadata> meta = 
H2Utils.meta(stmt.getMetaData());
-
-            QueryParserResultSelect select = new 
QueryParserResultSelect(twoStepQry, locSplit, meta);
+            // Split is required either if query is distributed, or when it is 
local, but executed
+            // over segmented PARTITIONED case. In this case multiple map 
queries will be executed against local
+            // node stripes in parallel and then merged through reduce process.
+            boolean splitNeeded = !loc || locSplit;
+
+            try {
+                GridCacheTwoStepQuery twoStepQry = null;
+
+                if (splitNeeded) {
+                    twoStepQry = GridSqlQuerySplitter.split(
+                        
connMgr.connectionForThread().connection(newQry.getSchema()),
+                        prepared,
+                        newQry.getArgs(),
+                        newQry.isCollocated(),
+                        newQry.isDistributedJoins(),
+                        newQry.isEnforceJoinOrder(),
+                        locSplit,
+                        idx
+                    );
+                }
+
+                List<GridQueryFieldMetadata> meta = 
H2Utils.meta(stmt.getMetaData());
+
+                boolean forUpdate = 
GridSqlQueryParser.isForUpdateQuery(prepared);
+
+                QueryParserResultSelect select = new QueryParserResultSelect(
+                    stmt0,
+                    twoStepQry,
+                    meta,
+                    cacheIds,
+                    mvccCacheId,
+                    forUpdate
+                );
 
-            return new QueryParserResult(newQry, remainingQry, select, null, 
null);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to parse query: " + 
newQry.getSql(), IgniteQueryErrorCode.PARSING, e);
-        }
-        catch (SQLException e) {
-            throw new IgniteSQLException(e);
+                return new QueryParserResult(newQry, remainingQry, select, 
null, null);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSQLException("Failed to parse query: " + 
newQry.getSql(), IgniteQueryErrorCode.PARSING,
+                    e);
+            }
+            catch (SQLException e) {
+                throw new IgniteSQLException(e);
+            }
         }
         finally {
-            // TODO: Leak if returned earlier. Will be fixed in 
https://issues.apache.org/jira/browse/IGNITE-11279
             U.close(stmt, log);
         }
     }
 
     /**
-     * Prepare DML statement.
+     * Throw exception is multiple statements are not allowed.
+     * 
+     * @param allowed Whether multiple statements are allowed.
+     */
+    private static void checkRemainingAllowed(boolean allowed) {
+        if (allowed)
+            return; 
+        
+        throw new IgniteSQLException("Multiple statements queries are not 
supported.",
+            IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+    }
+
+    /**
+     * Get ID of the first MVCC cache for SELECT.
      *
-     * @param preparedStmt Prepared statement.
-     * @return Statement.
+     * @param objMap Object map.
+     * @return ID of the first MVCC cache or {@code null} if no MVCC caches 
involved.
      */
-    public QueryParserResultDml prepareDmlStatement(PreparedStatement 
preparedStmt) {
-        Prepared prep = GridSqlQueryParser.prepared(preparedStmt);
+    private Integer mvccCacheIdForSelect(Map<Object, Object> objMap) {
+        Boolean mvccEnabled = null;
+        Integer mvccCacheId = null;
+        GridCacheContextInfo cctx = null;
 
-        return prepareDmlStatement(prep);
+        for (Object o : objMap.values()) {
+            if (o instanceof GridSqlAlias)
+                o = GridSqlAlias.unwrap((GridSqlAst)o);
+            if (o instanceof GridSqlTable && ((GridSqlTable)o).dataTable() != 
null) {
+                GridSqlTable tbl = (GridSqlTable)o;
+
+                if (tbl.dataTable() != null) {
+                    GridCacheContextInfo curCctx = tbl.dataTable().cacheInfo();
+
+                    assert curCctx != null;
+
+                    boolean curMvccEnabled =
+                        curCctx.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+                    if (mvccEnabled == null) {
+                        mvccEnabled = curMvccEnabled;
+
+                        if (mvccEnabled)
+                            mvccCacheId = curCctx.cacheId();
+
+                        cctx = curCctx;
+                    }
+                    else if (mvccEnabled != curMvccEnabled)
+                        
MvccUtils.throwAtomicityModesMismatchException(cctx.config(), curCctx.config());
+                }
+            }
+        }
+
+        return mvccCacheId;
     }
 
     /**
@@ -411,7 +478,7 @@ public class QueryParser {
      * @param prepared Prepared.
      * @return Statement.
      */
-    public QueryParserResultDml prepareDmlStatement(Prepared prepared) {
+    private QueryParserResultDml prepareDmlStatement(Prepared prepared) {
         // Prepare AST.
         GridSqlQueryParser parser = new GridSqlQueryParser(false);
 
@@ -441,7 +508,16 @@ public class QueryParser {
                 MvccUtils.throwAtomicityModesMismatchException(ctx.config(), 
curCtx.config());
         }
 
-        return new QueryParserResultDml(stmt, mvccEnabled);
+        // Get streamer info.
+        GridH2Table streamTbl = null;
+
+        if (GridSqlQueryParser.isStreamableInsertStatement(prepared)) {
+            GridSqlInsert insert = (GridSqlInsert)stmt;
+
+            streamTbl = 
DmlAstUtils.gridTableForElement(insert.into()).dataTable();
+        }
+
+        return new QueryParserResultDml(stmt, mvccEnabled, streamTbl);
     }
 
     /**
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
index 03067e0..29299c8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.h2.command.Prepared;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Parsing result for DML statement.
@@ -30,14 +32,20 @@ public class QueryParserResultDml {
     /** MVCC enabled flag. */
     private final boolean mvccEnabled;
 
+    /** Streamer table. */
+    private final GridH2Table streamTbl;
+
     /**
      * Constructor.
      *
      * @param stmt Command.
+     * @param mvccEnabled Whether MVCC is enabled.
+     * @param streamTbl Streamer table.
      */
-    public QueryParserResultDml(GridSqlStatement stmt, boolean mvccEnabled) {
+    public QueryParserResultDml(GridSqlStatement stmt, boolean mvccEnabled, 
@Nullable GridH2Table streamTbl) {
         this.stmt = stmt;
         this.mvccEnabled = mvccEnabled;
+        this.streamTbl = streamTbl;
     }
 
     /**
@@ -53,4 +61,18 @@ public class QueryParserResultDml {
     public boolean mvccEnabled() {
         return mvccEnabled;
     }
+
+    /**
+     * @return Streamer table.
+     */
+    @Nullable public GridH2Table streamTable() {
+        return streamTbl;
+    }
+
+    /**
+     * @return Whether statement can be used in streaming.
+     */
+    public boolean streamable() {
+        return streamTbl != null;
+    }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
index 29cbe0e..77eec9a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2;
 
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.List;
@@ -28,47 +29,62 @@ import java.util.List;
  */
 @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
 public class QueryParserResultSelect {
+    /** Statmement. */
+    private GridSqlStatement stmt;
+
     /** Two-step query, or {@code} null if this result is for local query. */
     private final GridCacheTwoStepQuery twoStepQry;
 
-    /** Whether local split is needed. */
-    private final boolean locSplit;
-
     /** Metadata for two-step query, or {@code} null if this result is for 
local query. */
     private final List<GridQueryFieldMetadata> meta;
 
+    /** Involved cache IDs. */
+    private final List<Integer> cacheIds;
+
+    /** ID of the first MVCC cache. */
+    private final Integer mvccCacheId;
+
+    /** FOR UPDATE flag. */
+    private final boolean forUpdate;
+
     /**
      * Constructor.
      *
+     * @param stmt Statement.
      * @param twoStepQry Distributed query plan.
-     * @param locSplit Whether local split is needed.
      * @param meta Fields metadata.
+     * @param cacheIds Cache IDs.
+     * @param mvccCacheId ID of the first MVCC cache.
+     * @param forUpdate Whether this is FOR UPDATE flag.
      */
     public QueryParserResultSelect(
+        GridSqlStatement stmt,
         @Nullable GridCacheTwoStepQuery twoStepQry,
-        boolean locSplit,
-        List<GridQueryFieldMetadata> meta
+        List<GridQueryFieldMetadata> meta,
+        List<Integer> cacheIds,
+        @Nullable Integer mvccCacheId,
+        boolean forUpdate
     ) {
-        // Local split can be true only is there is a two-step plan.
-        assert twoStepQry == null && !locSplit || twoStepQry != null;
-
+        this.stmt = stmt;
         this.twoStepQry = twoStepQry;
-        this.locSplit = locSplit;
         this.meta = meta;
+        this.cacheIds = cacheIds;
+        this.mvccCacheId = mvccCacheId;
+        this.forUpdate = forUpdate;
     }
 
     /**
-     * @return Two-step query, or {@code} null if this result is for local 
query.
+     * @return Parsed SELECT statement.
      */
-    @Nullable public GridCacheTwoStepQuery twoStepQuery() {
-        return twoStepQry;
+    public GridSqlStatement statement() {
+        return stmt;
     }
 
     /**
-     * @return {@code True} if local query should be split.
+     * @return Two-step query, or {@code} null if this result is for local 
query.
      */
-    public boolean localSplit() {
-        return locSplit;
+    @Nullable public GridCacheTwoStepQuery twoStepQuery() {
+        return twoStepQry;
     }
 
     /**
@@ -84,4 +100,32 @@ public class QueryParserResultSelect {
     public boolean splitNeeded() {
         return twoStepQry != null;
     }
+
+    /**
+     * @return Involved cache IDs.
+     */
+    public List<Integer> cacheIds() {
+        return cacheIds;
+    }
+
+    /**
+     * @return ID of the first MVCC cache.
+     */
+    public Integer mvccCacheId() {
+        return mvccCacheId;
+    }
+
+    /**
+     * @return Whether this is a SELECT for MVCC caches.
+     */
+    public boolean mvccEnabled() {
+        return mvccCacheId != null;
+    }
+
+    /**
+     * @return Whether this is FOR UPDATE query.
+     */
+    public boolean forUpdate() {
+        return forUpdate;
+    }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 709ce17..a2030ca 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -88,27 +88,25 @@ public final class UpdatePlanBuilder {
      * Generate SELECT statements to retrieve data for modifications from and 
find fast UPDATE or DELETE args,
      * if available.
      *
+     * @param schemaName Schema name.
      * @param stmt Statement.
      * @param mvccEnabled MVCC enabled flag.
-     * @param loc Local query flag.
      * @param idx Indexing.
-     * @param conn Connection.
      * @param fieldsQry Original query.
      * @return Update plan.
      */
     @SuppressWarnings("ConstantConditions")
     public static UpdatePlan planForStatement(
+        String schemaName,
         GridSqlStatement stmt,
         boolean mvccEnabled,
-        boolean loc,
         IgniteH2Indexing idx,
-        @Nullable Connection conn,
         @Nullable SqlFieldsQuery fieldsQry
     ) throws IgniteCheckedException {
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
-            return planForInsert(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
+            return planForInsert(schemaName, stmt, idx, mvccEnabled, 
fieldsQry);
         else if (stmt instanceof GridSqlUpdate || stmt instanceof 
GridSqlDelete)
-            return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
+            return planForUpdate(schemaName, stmt, idx, mvccEnabled, 
fieldsQry);
         else
             throw new IgniteSQLException("Unsupported operation: " + 
stmt.getSQL(),
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -117,19 +115,22 @@ public final class UpdatePlanBuilder {
     /**
      * Prepare update plan for INSERT or MERGE.
      *
+     * @param schemaName Schema name.
      * @param stmt INSERT or MERGE statement.
-     * @param loc Local query flag.
      * @param idx Indexing.
      * @param mvccEnabled Mvcc flag.
-     * @param conn Connection.
      * @param fieldsQuery Original query.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("ConstantConditions")
-    private static UpdatePlan planForInsert(GridSqlStatement stmt, boolean 
loc, IgniteH2Indexing idx,
-        boolean mvccEnabled, @Nullable Connection conn, @Nullable 
SqlFieldsQuery fieldsQuery)
-        throws IgniteCheckedException {
+    private static UpdatePlan planForInsert(
+        String schemaName,
+        GridSqlStatement stmt,
+        IgniteH2Indexing idx,
+        boolean mvccEnabled,
+        @Nullable SqlFieldsQuery fieldsQuery
+    ) throws IgniteCheckedException {
         GridSqlQuery sel = null;
 
         GridSqlElement target;
@@ -251,9 +252,18 @@ public final class UpdatePlanBuilder {
 
         String selectSql = sel != null ? sel.getSQL() : null;
 
-        DmlDistributedPlanInfo distributed = (rowsNum == 0 && 
!F.isEmpty(selectSql)) ?
-            checkPlanCanBeDistributed(idx, mvccEnabled, conn, fieldsQuery, 
loc, selectSql,
-            tbl.dataTable().cacheName()) : null;
+        DmlDistributedPlanInfo distributed = null;
+
+        if (rowsNum == 0 && !F.isEmpty(selectSql)) {
+            distributed = checkPlanCanBeDistributed(
+                idx,
+                mvccEnabled,
+                schemaName,
+                fieldsQuery,
+                selectSql,
+                tbl.dataTable().cacheName()
+            );
+        }
 
         UpdateMode mode = stmt instanceof GridSqlMerge ? UpdateMode.MERGE : 
UpdateMode.INSERT;
 
@@ -323,21 +333,19 @@ public final class UpdatePlanBuilder {
     /**
      * Prepare update plan for UPDATE or DELETE.
      *
+     * @param schemaName Schema name.
      * @param stmt UPDATE or DELETE statement.
-     * @param loc Local query flag.
      * @param idx Indexing.
-     * @param mvccEnabled Mvcc flag.
-     * @param conn Connection.
+     * @param mvccEnabled MVCC flag.
      * @param fieldsQuery Original query.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
     private static UpdatePlan planForUpdate(
+        String schemaName,
         GridSqlStatement stmt,
-        boolean loc,
         IgniteH2Indexing idx,
         boolean mvccEnabled,
-        @Nullable Connection conn,
         @Nullable SqlFieldsQuery fieldsQuery
     ) throws IgniteCheckedException {
         GridSqlElement target;
@@ -428,9 +436,18 @@ public final class UpdatePlanBuilder {
 
                 String selectSql = sel.getSQL();
 
-                DmlDistributedPlanInfo distributed = F.isEmpty(selectSql) ? 
null :
-                    checkPlanCanBeDistributed(idx, mvccEnabled, conn, 
fieldsQuery, loc, selectSql,
-                    tbl.dataTable().cacheName());
+                DmlDistributedPlanInfo distributed = null;
+
+                if (!F.isEmpty(selectSql)) {
+                    distributed = checkPlanCanBeDistributed(
+                        idx,
+                        mvccEnabled,
+                        schemaName,
+                        fieldsQuery,
+                        selectSql,
+                        tbl.dataTable().cacheName()
+                    );
+                }
 
                 return new UpdatePlan(
                     UpdateMode.UPDATE,
@@ -454,9 +471,18 @@ public final class UpdatePlanBuilder {
 
                 String selectSql = sel.getSQL();
 
-                DmlDistributedPlanInfo distributed = F.isEmpty(selectSql) ? 
null :
-                    checkPlanCanBeDistributed(idx, mvccEnabled, conn, 
fieldsQuery, loc, selectSql,
-                    tbl.dataTable().cacheName());
+                DmlDistributedPlanInfo distributed = null;
+
+                if (!F.isEmpty(selectSql)) {
+                    distributed = checkPlanCanBeDistributed(
+                        idx,
+                        mvccEnabled,
+                        schemaName,
+                        fieldsQuery,
+                        selectSql,
+                        tbl.dataTable().cacheName()
+                    );
+                }
 
                 return new UpdatePlan(
                     UpdateMode.DELETE,
@@ -836,23 +862,26 @@ public final class UpdatePlanBuilder {
      *
      * @param idx Indexing.
      * @param mvccEnabled Mvcc flag.
-     * @param conn Connection.
+     * @param schemaName Schema name.
      * @param fieldsQry Initial update query.
-     * @param loc Local query flag.
      * @param selectQry Derived select query.
      * @param cacheName Cache name.
      * @return distributed update plan info, or {@code null} if cannot be 
distributed.
      * @throws IgniteCheckedException if failed.
      */
-    private static DmlDistributedPlanInfo 
checkPlanCanBeDistributed(IgniteH2Indexing idx, boolean mvccEnabled,
-        Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String 
selectQry, String cacheName)
+    private static DmlDistributedPlanInfo checkPlanCanBeDistributed(
+        IgniteH2Indexing idx,
+        boolean mvccEnabled,
+        String schemaName,
+        SqlFieldsQuery fieldsQry,
+        String selectQry,
+        String cacheName
+    )
         throws IgniteCheckedException {
-        if (loc || (!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) || 
DmlUtils.isBatched(fieldsQry))
+        if ((!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) || 
DmlUtils.isBatched(fieldsQry))
             return null;
 
-        assert conn != null;
-
-        try {
+        try (Connection conn = 
idx.connections().connectionNoCache(schemaName)) {
             // Get a new prepared statement for derived select query.
             try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
                 H2Utils.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
@@ -869,13 +898,12 @@ public final class UpdatePlanBuilder {
                 );
 
                 boolean distributed =
-                    !qry.isLocalSplit() &&                                     
               // No split for local qry
+                    !qry.isLocalSplit() &&                                     
               // No split for local
                     qry.hasCacheIds() &&                                       
               // Over real caches
                     qry.skipMergeTable() &&                                    
               // No merge table
                     qry.mapQueries().size() == 1 && 
!qry.mapQueries().get(0).hasSubQueries(); // One w/o subqueries
 
                 if (distributed) {
-                    // TODO: This should be done during plan build.
                     List<Integer> cacheIds = H2Utils.collectCacheIds(idx, 
CU.cacheId(cacheName), qry.tables());
 
                     H2Utils.checkQuery(idx, cacheIds, qry.mvccEnabled(), 
qry.forUpdate(), qry.tables());
@@ -897,7 +925,7 @@ public final class UpdatePlanBuilder {
      * @param qry Query.
      * @return {@code true} if update can be distributed.
      */
-    public static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
+    private static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
         return qry != null && !qry.isLocal() &&
             qry instanceof SqlFieldsQueryEx && 
((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 4d298ff..7417ba2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -582,11 +582,7 @@ public class GridSqlQueryParser {
 
     /** */
     private static Command extractCommand(PreparedStatement stmt) {
-        try {
-            return COMMAND.get(stmt.unwrap(JdbcPreparedStatement.class));
-        } catch (SQLException e) {
-            throw new IgniteSQLException(e);
-        }
+        return COMMAND.get((JdbcPreparedStatement)stmt);
     }
 
     /**
@@ -1784,7 +1780,7 @@ public class GridSqlQueryParser {
     /**
      * @return All known cache IDs.
      */
-    public Collection<Integer> cacheIds() {
+    public List<Integer> cacheIds() {
         ArrayList<Integer> res = new ArrayList<>(1);
 
         for (Object o : h2ObjToGridObj.values()) {
@@ -2303,6 +2299,16 @@ public class GridSqlQueryParser {
     public static boolean isStreamableInsertStatement(PreparedStatement 
nativeStmt) {
         Prepared prep = prepared(nativeStmt);
 
+        return isStreamableInsertStatement(prep);
+    }
+
+    /**
+     * Check if passed statement is insert statement eligible for streaming.
+     *
+     * @param prep Prepared statement.
+     * @return {@code True} if streamable insert.
+     */
+    public static boolean isStreamableInsertStatement(Prepared prep) {
         return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == 
null;
     }
 
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index 15c5abe..e7f3b8c 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
@@ -554,43 +553,6 @@ public class RunningQueriesTest extends 
AbstractIndexingCommonTest {
     }
 
     /**
-     * Check tracking running queries for stream batching.
-     *
-     * @throws Exception in case of failure.
-     */
-    @Test
-    public void testJdbcStreamBatchUpdate() throws Exception {
-        try (Connection conn = GridTestUtils.connect(ignite, null); Statement 
stmt = conn.createStatement()) {
-            conn.setSchema("\"default\"");
-
-            newBarrier(1);
-
-            final int BATCH_SIZE = 10;
-
-            stmt.executeUpdate("SET STREAMING ON BATCH_SIZE " + BATCH_SIZE);
-
-            newBarrier(2);
-
-            for (int i = 0; i < BATCH_SIZE; i++)
-                stmt.addBatch("insert into Integer (_key, _val) values (" + i 
+ "," + i + ")");
-
-            for (int i = 0; i < BATCH_SIZE; i++) {
-                assertWaitingOnBarrier();
-
-                awaitTimeouted();
-
-                assertWaitingOnBarrier();
-
-                Collection<GridRunningQueryInfo> runningQueries = 
ignite.context().query().runningQueries(-1);
-
-                assertEquals(1, runningQueries.size());
-
-                awaitTimeouted();
-            }
-        }
-    }
-
-    /**
      * Check tracking running queries for stream COPY command.
      *
      * @throws SQLException If failed.
@@ -682,18 +644,6 @@ public class RunningQueriesTest extends 
AbstractIndexingCommonTest {
      */
     private static class BlockingIndexing extends IgniteH2Indexing {
         /** {@inheritDoc} */
-        @Override public void checkStatementStreamable(PreparedStatement 
nativeStmt) {
-            super.checkStatementStreamable(nativeStmt);
-
-            try {
-                barrier.await();
-            }
-            catch (Exception e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
         @Override public List<FieldsQueryCursor<List<?>>> 
querySqlFields(String schemaName, SqlFieldsQuery qry,
             @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts,
             MvccQueryTracker tracker, GridQueryCancel cancel, boolean 
registerAsNewQry) {
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
deleted file mode 100644
index de1fb09..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class H2StatementCacheSelfTest extends AbstractIndexingCommonTest {
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testEviction() throws Exception {
-        H2StatementCache stmtCache = new H2StatementCache(1);
-        H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
-        PreparedStatement stmt1 = stmt();
-        stmtCache.put(key1, stmt1);
-
-        assertSame(stmt1, stmtCache.get(key1));
-
-        stmtCache.put(new H2CachedStatementKey("mydb", "2"), stmt());
-
-        assertNull(stmtCache.get(key1));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testLruEvictionInStoreOrder() throws Exception {
-        H2StatementCache stmtCache = new H2StatementCache(2);
-
-        H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
-        H2CachedStatementKey key2 = new H2CachedStatementKey("", "2");
-        stmtCache.put(key1, stmt());
-        stmtCache.put(key2, stmt());
-
-        stmtCache.put(new H2CachedStatementKey("", "3"), stmt());
-
-        assertNull(stmtCache.get(key1));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testLruEvictionInAccessOrder() throws Exception {
-        H2StatementCache stmtCache = new H2StatementCache(2);
-
-        H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
-        H2CachedStatementKey key2 = new H2CachedStatementKey("", "2");
-        stmtCache.put(key1, stmt());
-        stmtCache.put(key2, stmt());
-        stmtCache.get(key1);
-
-        stmtCache.put(new H2CachedStatementKey("", "3"), stmt());
-
-        assertNull(stmtCache.get(key2));
-    }
-
-    /**
-     *
-     */
-    private static PreparedStatement stmt() {
-        return new PreparedStatementExImpl(null);
-    }
-}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
deleted file mode 100644
index f0c7986..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class PreparedStatementExSelfTest extends AbstractIndexingCommonTest {
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testStoringMeta() throws Exception {
-        PreparedStatement stmt = stmt();
-
-        PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class);
-
-        wrapped.putMeta(0, "0");
-
-        assertEquals("0", wrapped.meta(0));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testStoringMoreMetaKeepsExisting() throws Exception {
-        PreparedStatement stmt = stmt();
-
-        PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class);
-
-        wrapped.putMeta(0, "0");
-        wrapped.putMeta(1, "1");
-
-        assertEquals("0", wrapped.meta(0));
-        assertEquals("1", wrapped.meta(1));
-    }
-
-    /**
-     *
-     */
-    private static PreparedStatement stmt() {
-        return new PreparedStatementExImpl(null);
-    }
-}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 095518c..0093108 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -205,10 +205,8 @@ import 
org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
 import org.apache.ignite.internal.processors.query.SqlSystemViewsSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildSelfTest;
 import 
org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
-import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
 import 
org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
-import 
org.apache.ignite.internal.processors.query.h2.PreparedStatementExSelfTest;
 import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
 import 
org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest;
 import 
org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
@@ -513,8 +511,6 @@ import org.junit.runners.Suite;
     EncryptedSqlTableTest.class,
 
     ThreadLocalObjectPoolSelfTest.class,
-    H2StatementCacheSelfTest.class,
-    PreparedStatementExSelfTest.class,
 
     // Partition loss.
     IndexingCachePartitionLossPolicySelfTest.class,

Reply via email to