This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1619dd5 IGNITE-11326: SQL: Common parsing routine for all command
types. This closes #6112.
1619dd5 is described below
commit 1619dd5603f9b9eb71dbffc70add54df0a290e2c
Author: devozerov <[email protected]>
AuthorDate: Fri Feb 15 17:05:42 2019 +0300
IGNITE-11326: SQL: Common parsing routine for all command types. This
closes #6112.
---
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 4 +-
.../internal/processors/cache/mvcc/MvccUtils.java | 6 +-
.../cache/query/GridCacheQueryAdapter.java | 2 +-
.../processors/odbc/jdbc/JdbcRequestHandler.java | 8 +-
.../processors/odbc/odbc/OdbcRequestHandler.java | 7 +-
.../processors/query/GridQueryProcessor.java | 2 +
.../processors/query/h2/ConnectionManager.java | 27 +-
.../processors/query/h2/H2CachedStatementKey.java | 13 +-
.../internal/processors/query/h2/H2Utils.java | 5 +-
.../processors/query/h2/IgniteH2Indexing.java | 531 ++++++-----------
.../processors/query/h2/PreparedStatementEx.java | 48 --
.../query/h2/PreparedStatementExImpl.java | 648 ---------------------
.../internal/processors/query/h2/QueryParser.java | 320 ++++++----
.../processors/query/h2/QueryParserResultDml.java | 24 +-
.../query/h2/QueryParserResultSelect.java | 76 ++-
.../processors/query/h2/dml/UpdatePlanBuilder.java | 100 ++--
.../query/h2/sql/GridSqlQueryParser.java | 18 +-
.../processors/query/RunningQueriesTest.java | 50 --
.../query/h2/H2StatementCacheSelfTest.java | 86 ---
.../query/h2/PreparedStatementExSelfTest.java | 64 --
.../IgniteBinaryCacheQueryTestSuite.java | 4 -
22 files changed, 600 insertions(+), 1447 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 281669e..1c60aae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -237,7 +237,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
if (ctx.mvccEnabled()) {
try {
if (tx != null)
- mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+ mvccSnapshot = MvccUtils.requestSnapshot(tx);
else {
mvccTracker = MvccUtils.mvccTracker(ctx, null);
@@ -342,7 +342,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
if (ctx.mvccEnabled()) {
try {
if (tx != null)
- mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+ mvccSnapshot = MvccUtils.requestSnapshot(tx);
else {
mvccTracker = MvccUtils.mvccTracker(ctx, null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index d6fcf50..db673e5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -718,7 +718,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
@Nullable final CacheEntryPredicate filter
) {
try {
- MvccUtils.requestSnapshot(cacheCtx, this);
+ MvccUtils.requestSnapshot(this);
beforePut(cacheCtx, retval, true);
}
@@ -1898,7 +1898,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
@Nullable final CacheEntryPredicate filter
) {
try {
- MvccUtils.requestSnapshot(cacheCtx, this);
+ MvccUtils.requestSnapshot(this);
beforeRemove(cacheCtx, retval, true);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 225af81..cf7b62b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -817,7 +817,7 @@ public class MvccUtils {
if (tx == null)
tracker = new MvccQueryTrackerImpl(cctx);
else
- tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx,
tx));
+ tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(tx));
if (tracker.snapshot() == null)
// TODO IGNITE-7388
@@ -827,13 +827,11 @@ public class MvccUtils {
}
/**
- * @param cctx Cache context.
* @param tx Transaction.
* @throws IgniteCheckedException If failed.
* @return Mvcc snapshot.
*/
- public static MvccSnapshot requestSnapshot(GridCacheContext cctx,
- @NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+ public static MvccSnapshot requestSnapshot(@NotNull GridNearTxLocal tx)
throws IgniteCheckedException {
MvccSnapshot snapshot = tx.mvccSnapshot();
if (snapshot == null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 9fc02d7..f69dd0c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -594,7 +594,7 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
GridNearTxLocal tx = cctx.tm().userTx();
if (tx != null)
- mvccSnapshot = MvccUtils.requestSnapshot(cctx, tx);
+ mvccSnapshot = MvccUtils.requestSnapshot(tx);
else {
mvccTracker = MvccUtils.mvccTracker(cctx, null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 7a28a19..c2db8e0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -906,8 +906,12 @@ public class JdbcRequestHandler implements
ClientListenerRequestHandler {
IgniteBiTuple<Integer, String> firstErr, GridQueryCancel cancel)
throws QueryCancelledException {
try {
if (cliCtx.isStream()) {
- List<Long> cnt =
ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
- qry.batchedArguments());
+ List<Long> cnt = ctx.query().streamBatchedUpdateQuery(
+ qry.getSchema(),
+ cliCtx,
+ qry.getSql(),
+ qry.batchedArguments()
+ );
for (int i = 0; i < cnt.size(); i++)
updCntsAcc.add(cnt.get(i).intValue());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 43fedae..abc8085 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -558,7 +558,12 @@ public class OdbcRequestHandler implements
ClientListenerRequestHandler {
try {
assert cliCtx.isStream();
- ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx,
qry.getSql(), qry.batchedArguments());
+ ctx.query().streamBatchedUpdateQuery(
+ qry.getSchema(),
+ cliCtx,
+ qry.getSql(),
+ qry.batchedArguments()
+ );
}
catch (Exception e) {
U.error(log, "Failed to execute batch query [qry=" + qry +']', e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e099a1c..95f5e3b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2321,6 +2321,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
*/
public List<Long> streamBatchedUpdateQuery(final String schemaName, final
SqlClientContext cliCtx,
final String qry, final List<Object[]> args) {
+ checkxEnabled();
+
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is
stopping).");
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
index 8c1e89c..18a842b 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -302,14 +302,25 @@ public class ConnectionManager {
PreparedStatement stmt = cache.get(key);
- if (stmt != null && !stmt.isClosed() &&
!stmt.unwrap(JdbcStatement.class).isCancelled() &&
- !GridSqlQueryParser.prepared(stmt).needRecompile()) {
- assert stmt.getConnection() == c;
+ // Nothing found.
+ if (stmt == null)
+ return null;
+
+ // TODO: Remove thread local caching at all. Just keep per-connection
statement cache.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-11211
+ // Statement is not from the given connection.
+ if (stmt.getConnection() != c)
+ return null;
+
+ // Is statement still valid?
+ if (
+ stmt.isClosed() || // Closed.
+ stmt.unwrap(JdbcStatement.class).isCancelled() || // Cancelled.
+ GridSqlQueryParser.prepared(stmt).needRecompile() // Outdated
(schema has been changed concurrently).
+ )
+ return null;
- return stmt;
- }
-
- return null;
+ return stmt;
}
/**
@@ -328,7 +339,7 @@ public class ConnectionManager {
H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(),
sql);
- stmt = PreparedStatementExImpl.wrap(prepareStatementNoCache(c,
sql));
+ stmt = prepareStatementNoCache(c, sql);
cache.put(key, stmt);
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
index 300ed6c..360c2ad 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
@@ -41,8 +41,8 @@ class H2CachedStatementKey {
* @param schemaName Schema name.
* @param sql SQL.
*/
- H2CachedStatementKey(String schemaName, String sql) {
- this(schemaName, sql, null, false);
+ public H2CachedStatementKey(String schemaName, String sql) {
+ this(schemaName, sql, null);
}
/**
@@ -51,19 +51,20 @@ class H2CachedStatementKey {
* @param schemaName Schema name.
* @param sql SQL.
* @param fieldsQry Query with flags.
- * @param loc DML {@code SELECT} Locality flag.
*/
- public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery
fieldsQry, boolean loc) {
+ public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery
fieldsQry) {
this.schemaName = schemaName;
this.sql = sql;
- if (fieldsQry == null || loc ||
!UpdatePlanBuilder.isSkipReducerOnUpdateQuery(fieldsQry))
+ if (fieldsQry == null)
this.flags = 0; // flags only relevant for server side updates.
else {
this.flags = (byte)(1 +
(fieldsQry.isDistributedJoins() ? 2 : 0) +
(fieldsQry.isEnforceJoinOrder() ? 4 : 0) +
- (fieldsQry.isCollocated() ? 8 : 0));
+ (fieldsQry.isCollocated() ? 8 : 0) +
+ (fieldsQry.isLocal() ? 8 : 0)
+ );
}
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index b86a481..f45e574 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -102,13 +102,16 @@ import javax.cache.CacheException;
import static
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
import static
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
-import static
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
import static
org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.prepared;
/**
* H2 utility methods.
*/
public class H2Utils {
+ /** Dummy metadata for update result. */
+ public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META =
+ Collections.singletonList(new H2SqlFieldMetadata(null, null,
"UPDATED", Long.class.getName(), -1, -1));
+
/** Spatial index class name. */
private static final String SPATIAL_IDX_CLS =
"org.apache.ignite.internal.processors.query.h2.opt.GridH2SpatialIndex";
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7f24399..12e8f9e 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,7 +77,6 @@ import
org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import
org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -116,11 +115,8 @@ import
org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import
org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
@@ -161,7 +157,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
-import org.h2.command.Prepared;
import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.table.IndexColumn;
@@ -175,8 +170,12 @@ import static
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.request
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
import static
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
-import static
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
-import static
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
+import static
org.apache.ignite.internal.processors.query.h2.H2Utils.UPDATE_RESULT_META;
+import static
org.apache.ignite.internal.processors.query.h2.H2Utils.bindParameters;
+import static
org.apache.ignite.internal.processors.query.h2.H2Utils.generateFieldsQueryString;
+import static org.apache.ignite.internal.processors.query.h2.H2Utils.session;
+import static
org.apache.ignite.internal.processors.query.h2.H2Utils.validateTypeDescriptor;
+import static
org.apache.ignite.internal.processors.query.h2.H2Utils.zeroCursor;
import static
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
/**
@@ -199,10 +198,6 @@ public class IgniteH2Indexing implements GridQueryIndexing
{
H2ExtrasLeafIO.register();
}
- /** Dummy metadata for update result. */
- public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META =
- Collections.singletonList(new H2SqlFieldMetadata(null, null,
"UPDATED", Long.class.getName(), -1, -1));
-
/** Default number of attempts to re-run DELETE and UPDATE queries in case
of concurrent modifications of values. */
private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
@@ -242,9 +237,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
protected volatile GridKernalContext ctx;
- /** Cache object value context. */
- protected CacheQueryObjectValueContext valCtx;
-
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry = new
QueryContextRegistry();
@@ -288,22 +280,6 @@ public class IgniteH2Indexing implements GridQueryIndexing
{
return ctx;
}
- /**
- * @param c Connection.
- * @param sql SQL.
- * @return <b>Cached</b> prepared statement.
- */
- @SuppressWarnings("ConstantConditions")
- @Nullable private PreparedStatement cachedStatement(Connection c, String
sql) {
- try {
- return connMgr.cachedPreparedStatement(c, sql);
- }
- catch (SQLException e) {
- // We actually don't except anything SQL related here as we're
supposed to work with cache only.
- throw new AssertionError(e);
- }
- }
-
/** {@inheritDoc} */
@Override public PreparedStatement prepareNativeStatement(String
schemaName, String sql) {
Connection conn = connMgr.connectionForThread().connection(schemaName);
@@ -490,63 +466,64 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
assert mvccEnabled || mvccTracker == null;
try {
- final Connection conn =
connMgr.connectionForThread().connection(schemaName);
-
- H2Utils.setupConnection(conn, false, enforceJoinOrder);
+ SqlFieldsQuery fieldsQry = new SqlFieldsQuery(qry)
+ .setLocal(true)
+ .setEnforceJoinOrder(enforceJoinOrder)
+ .setDataPageScanEnabled(dataPageScanEnabled)
+ .setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
- PreparedStatement stmt = preparedStatementWithParams(conn, qry,
params, true);
+ if (params != null)
+ fieldsQry.setArgs(params.toArray());
- if (GridSqlQueryParser.checkMultipleStatements(stmt))
- throw new IgniteSQLException("Multiple statements queries are
not supported for local queries");
+ QueryParserResult parseRes = parser.parse(schemaName, fieldsQry,
false);
- Prepared p = GridSqlQueryParser.prepared(stmt);
-
- if (GridSqlQueryParser.isDml(p)) {
- QueryParserResultDml dml = parser.prepareDmlStatement(p);
-
- SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
-
- if (params != null)
- fldsQry.setArgs(params.toArray());
-
- fldsQry.setEnforceJoinOrder(enforceJoinOrder);
- fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
- fldsQry.setDataPageScanEnabled(dataPageScanEnabled);
-
- UpdateResult updRes = executeUpdate(schemaName, conn, dml,
fldsQry, true, filter, cancel);
+ if (parseRes.isDml()) {
+ UpdateResult updRes = executeUpdate(schemaName,
parseRes.dml(), fieldsQry, true, filter, cancel);
List<?> updResRow =
Collections.singletonList(updRes.counter());
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
new IgniteSingletonIterator<>(updResRow));
}
- else if (CommandProcessor.isCommand(p)) {
+ else if (parseRes.isCommand()) {
throw new IgniteSQLException("DDL statements are supported for
the whole cluster only.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
+ assert parseRes.isSelect();
+
+ QueryParserResultSelect select = parseRes.select();
+
+ assert select != null;
+
MvccSnapshot mvccSnapshot = null;
- boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(p);
+ boolean forUpdate = select.forUpdate();
if (forUpdate && !mvccEnabled)
throw new IgniteSQLException("SELECT FOR UPDATE query requires
transactional " +
"cache with MVCC enabled.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- if (ctx.security().enabled()) {
- GridSqlQueryParser parser = new GridSqlQueryParser(false);
-
- parser.parse(p);
-
- checkSecurity(parser.cacheIds());
- }
+ if (ctx.security().enabled())
+ checkSecurity(select.cacheIds());
GridNearTxSelectForUpdateFuture sfuFut = null;
int opTimeout = qryTimeout;
if (mvccEnabled) {
- if (mvccTracker == null)
- mvccTracker = mvccTracker(stmt, startTx);
+ if (mvccTracker == null) {
+ Integer mvccCacheId = select.mvccCacheId();
+
+ if (mvccCacheId != null) {
+ GridCacheContext mvccCacheCtx =
ctx.cache().context().cacheContext(select.mvccCacheId());
+
+ if (mvccCacheCtx == null)
+ throw new IgniteCheckedException("Cache has been
stopped concurrently [cacheId=" +
+ mvccCacheId + ']');
+
+ mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx,
startTx);
+ }
+ }
if (mvccTracker != null) {
mvccSnapshot = mvccTracker.snapshot();
@@ -561,11 +538,7 @@ public class IgniteH2Indexing implements GridQueryIndexing
{
throw new IgniteSQLException("SELECT FOR UPDATE query
requires transactional " +
"cache with MVCC enabled.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- GridSqlStatement stmt0 = new
GridSqlQueryParser(false).parse(p);
-
- qry =
GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(stmt0, forUpdate = tx != null);
-
- stmt = preparedStatementWithParams(conn, qry, params,
true);
+ qry =
GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(select.statement(), forUpdate
= tx != null);
if (forUpdate) {
GridCacheContext cctx = mvccTracker.context();
@@ -588,7 +561,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridNearTxLocal tx0 = tx;
MvccQueryTracker mvccTracker0 = mvccTracker;
GridNearTxSelectForUpdateFuture sfuFut0 = sfuFut;
- PreparedStatement stmt0 = stmt;
String qry0 = qry;
int timeout0 = opTimeout;
@@ -606,10 +578,27 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
qryCtxRegistry.setThreadLocal(qctx);
- ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable
detachedConn = connMgr.detachThreadConnection();
+ ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn =
connMgr.detachThreadConnection();
try {
- ResultSet rs = executeSqlQueryWithTimer(stmt0, conn,
qry0, params, timeout0, cancel, dataPageScanEnabled);
+ Connection conn0 =
conn.object().connection(schemaName);
+
+ PreparedStatement stmt = preparedStatementWithParams(
+ conn0,
+ qry0,
+ params,
+ true
+ );
+
+ ResultSet rs = executeSqlQueryWithTimer(
+ stmt,
+ conn0,
+ qry0,
+ params,
+ timeout0,
+ cancel,
+ dataPageScanEnabled
+ );
if (sfuFut0 != null) {
assert tx0.mvccSnapshot() != null;
@@ -662,10 +651,10 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
}
- return new H2FieldsIterator(rs, mvccTracker0, sfuFut0
!= null, detachedConn);
+ return new H2FieldsIterator(rs, mvccTracker0, sfuFut0
!= null, conn);
}
catch (IgniteCheckedException | RuntimeException | Error
e) {
- detachedConn.recycle();
+ conn.recycle();
try {
if (mvccTracker0 != null)
@@ -707,56 +696,46 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public long streamUpdateQuery(String schemaName, String qry,
- @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws
IgniteCheckedException {
- final Connection conn =
connMgr.connectionForThread().connection(schemaName);
-
- final PreparedStatement stmt;
-
- try {
- stmt = connMgr.prepareStatement(conn, qry);
- }
- catch (SQLException e) {
- throw new IgniteSQLException(e);
- }
+ @Override public long streamUpdateQuery(
+ String schemaName,
+ String qry,
+ @Nullable Object[] params,
+ IgniteDataStreamer<?, ?> streamer
+ ) throws IgniteCheckedException {
+ QueryParserResultDml dml = streamerParse(schemaName, qry);
- return streamQuery0(qry, schemaName, streamer, stmt, params);
+ return streamQuery0(qry, schemaName, streamer, dml, params);
}
/** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public List<Long> streamBatchedUpdateQuery(String schemaName,
String qry, List<Object[]> params,
- SqlClientContext cliCtx) throws IgniteCheckedException {
+ @SuppressWarnings({"ForLoopReplaceableByForEach", "ConstantConditions"})
+ @Override public List<Long> streamBatchedUpdateQuery(
+ String schemaName,
+ String qry,
+ List<Object[]> params,
+ SqlClientContext cliCtx
+ ) throws IgniteCheckedException {
if (cliCtx == null || !cliCtx.isStream()) {
U.warn(log, "Connection is not in streaming mode.");
return zeroBatchedStreamedUpdateResult(params.size());
}
- final Connection conn =
connMgr.connectionForThread().connection(schemaName);
-
- final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
-
- if (GridSqlQueryParser.checkMultipleStatements(stmt))
- throw new IgniteSQLException("Multiple statements queries are not
supported for streaming mode.",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
- checkStatementStreamable(stmt);
-
- QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+ QueryParserResultDml dml = streamerParse(schemaName, qry);
- UpdatePlan plan = updatePlan(schemaName, conn, dml, null, true);
-
- IgniteDataStreamer<?, ?> streamer =
cliCtx.streamerForCache(plan.cacheContext().name());
+ IgniteDataStreamer<?, ?> streamer =
cliCtx.streamerForCache(dml.streamTable().cacheName());
assert streamer != null;
- List<Long> res = new ArrayList<>(params.size());
+ List<Long> ress = new ArrayList<>(params.size());
- for (int i = 0; i < params.size(); i++)
- res.add(streamQuery0(qry, schemaName, streamer, stmt,
params.get(i)));
+ for (int i = 0; i < params.size(); i++) {
+ long res = streamQuery0(qry, schemaName, streamer, dml,
params.get(i));
- return res;
+ ress.add(res);
+ }
+
+ return ress;
}
/**
@@ -765,77 +744,51 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* @param qry Query.
* @param schemaName Schema name.
* @param streamer Streamer to feed data to.
- * @param stmt Statement.
+ * @param dml DML statement.
* @param args Statement arguments.
* @return Number of rows in given INSERT statement.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
- private long streamQuery0(String qry, String schemaName,
IgniteDataStreamer streamer, PreparedStatement stmt,
+ private long streamQuery0(String qry, String schemaName,
IgniteDataStreamer streamer, QueryParserResultDml dml,
final Object[] args) throws IgniteCheckedException {
Long qryId = runningQryMgr.register(qry,
GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
boolean fail = false;
try {
- checkStatementStreamable(stmt);
+ UpdatePlan plan = updatePlan(schemaName, dml, null);
- QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+ List<List<?>> planRows = plan.createRows(args != null ? args :
X.EMPTY_OBJECT_ARRAY);
- final UpdatePlan plan = updatePlan(schemaName, null, dml, null,
true);
-
- assert plan.isLocalSubquery();
-
- final GridCacheContext cctx = plan.cacheContext();
-
- QueryCursorImpl<List<?>> cur;
-
- final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
-
- QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new
Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- try {
- assert F.isEmpty(plan.selectQuery());
-
- Object[] params = args != null ? args :
X.EMPTY_OBJECT_ARRAY;
-
- Iterator<List<?>> it =
plan.createRows(params).iterator();
-
- return new GridQueryCacheObjectsIterator(it,
objectContext(), cctx.keepBinary());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, null);
-
- data.addAll(stepCur.getAll());
-
- cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- return data.iterator();
- }
- }, null);
+ Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(
+ planRows.iterator(),
+ objectContext(),
+ true
+ );
- if (plan.rowCount() == 1) {
- IgniteBiTuple t = plan.processRow(cur.iterator().next());
+ if (planRows.size() == 1) {
+ IgniteBiTuple t = plan.processRow(iter.next());
streamer.addData(t.getKey(), t.getValue());
return 1;
}
+ else {
+ Map<Object, Object> rows = new
LinkedHashMap<>(plan.rowCount());
- Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+ while (iter.hasNext()) {
+ List<?> row = iter.next();
- for (List<?> row : cur) {
- final IgniteBiTuple t = plan.processRow(row);
+ IgniteBiTuple t = plan.processRow(row);
- rows.put(t.getKey(), t.getValue());
- }
+ rows.put(t.getKey(), t.getValue());
+ }
- streamer.addData(rows);
+ streamer.addData(rows);
- return rows.size();
+ return rows.size();
+ }
}
catch (IgniteCheckedException e) {
fail = true;
@@ -848,6 +801,26 @@ public class IgniteH2Indexing implements GridQueryIndexing
{
}
/**
+ * Parse statement for streamer.
+ *
+ * @param schemaName Schema name.
+ * @param qry Query.
+ * @return DML.
+ */
+ private QueryParserResultDml streamerParse(String schemaName, String qry) {
+ QueryParserResult parseRes = parser.parse(schemaName, new
SqlFieldsQuery(qry), false);
+
+ QueryParserResultDml dml = parseRes.dml();
+
+ if (dml == null || !dml.streamable()) {
+ throw new IgniteSQLException("Streaming mode supports only INSERT
commands without subqueries.",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+
+ return dml;
+ }
+
+ /**
* @param size Result size.
* @return List of given size filled with 0Ls.
*/
@@ -880,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteCheckedException("Failed to parse SQL query: " +
sql, e);
}
- H2Utils.bindParameters(stmt, params);
+ bindParameters(stmt, params);
return stmt;
}
@@ -915,7 +888,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
});
}
- Session ses = H2Utils.session(conn);
+ Session ses = session(conn);
if (timeoutMillis > 0)
ses.setQueryTimeout(timeoutMillis);
@@ -1102,99 +1075,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
/**
- * Initialises MVCC filter and returns MVCC query tracker if needed.
- * @param stmt Prepared statement.
- * @param startTx Start transaction flag.
- * @return MVCC query tracker or {@code null} if MVCC is disabled for
involved caches.
- */
- private MvccQueryTracker mvccTracker(PreparedStatement stmt, boolean
startTx) throws IgniteCheckedException {
- boolean mvccEnabled;
-
- GridCacheContext mvccCacheCtx = null;
-
- try {
- if (stmt.isWrapperFor(PreparedStatementEx.class)) {
- PreparedStatementEx stmtEx =
stmt.unwrap(PreparedStatementEx.class);
-
- Boolean mvccState = stmtEx.meta(MVCC_STATE);
-
- mvccEnabled = mvccState != null ? mvccState : checkMvcc(stmt);
-
- if (mvccEnabled) {
- Integer cacheId = stmtEx.meta(MVCC_CACHE_ID);
-
- assert cacheId != null;
-
- mvccCacheCtx = ctx.cache().context().cacheContext(cacheId);
-
- assert mvccCacheCtx != null;
- }
- }
- else
- mvccEnabled = checkMvcc(stmt);
- }
- catch (SQLException e) {
- throw new IgniteSQLException(e);
- }
-
- assert !mvccEnabled || mvccCacheCtx != null;
-
- return mvccEnabled ? MvccUtils.mvccTracker(mvccCacheCtx, startTx) :
null;
- }
-
- /**
- * Checks if statement uses MVCC caches. If it does, additional metadata
is added to statement.
- *
- * @param stmt Statement to check.
- * @return {@code True} if there MVCC cache involved in statement.
- * @throws SQLException If parser failed.
- */
- private static Boolean checkMvcc(PreparedStatement stmt) throws
SQLException {
- GridSqlQueryParser parser = new GridSqlQueryParser(false);
-
- parser.parse(GridSqlQueryParser.prepared(stmt));
-
- Boolean mvccEnabled = null;
- Integer mvccCacheId = null;
- GridCacheContext ctx0 = null;
-
- for (Object o : parser.objectsMap().values()) {
- if (o instanceof GridSqlAlias)
- o = GridSqlAlias.unwrap((GridSqlAst) o);
- if (o instanceof GridSqlTable && ((GridSqlTable) o).dataTable() !=
null) {
- GridCacheContext cctx =
((GridSqlTable)o).dataTable().cacheContext();
-
- assert cctx != null;
-
- if (mvccEnabled == null) {
- mvccEnabled = cctx.mvccEnabled();
- mvccCacheId = cctx.cacheId();
- ctx0 = cctx;
- }
- else if (mvccEnabled != cctx.mvccEnabled())
-
MvccUtils.throwAtomicityModesMismatchException(ctx0.config(), cctx.config());
- }
- }
-
- if (mvccEnabled == null)
- return false;
-
- // Remember mvccEnabled flag to avoid further additional parsing if
statement obtained from the statement cache.
- if (stmt.isWrapperFor(PreparedStatementEx.class)) {
- PreparedStatementEx stmtEx =
stmt.unwrap(PreparedStatementEx.class);
-
- if (mvccEnabled) {
- stmtEx.putMeta(MVCC_CACHE_ID, mvccCacheId);
- stmtEx.putMeta(MVCC_STATE, Boolean.TRUE);
- }
- else
- stmtEx.putMeta(MVCC_STATE, FALSE);
- }
-
- return mvccEnabled;
- }
-
- /**
* @param schemaName Schema name.
* @param qry Query.
* @param keepCacheObj Flag to keep cache object.
@@ -1273,27 +1153,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
}
- /**
- * Run DML on remote nodes.
- *
- * @param schemaName Schema name.
- * @param fieldsQry Initial update query.
- * @param cacheIds Cache identifiers.
- * @param isReplicatedOnly Whether query uses only replicated caches.
- * @param cancel Cancel state.
- * @return Update result.
- */
- UpdateResult runDistributedUpdate(
- String schemaName,
- SqlFieldsQuery fieldsQry,
- List<Integer> cacheIds,
- boolean isReplicatedOnly,
- GridQueryCancel cancel) {
- return rdcQryExec.update(schemaName, cacheIds, fieldsQry.getSql(),
fieldsQry.getArgs(),
- fieldsQry.isEnforceJoinOrder(), fieldsQry.getPageSize(),
fieldsQry.getTimeout(),
- fieldsQry.getPartitions(), isReplicatedOnly, cancel);
- }
-
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override public SqlFieldsQuery generateFieldsQuery(String cacheName,
SqlQuery qry) {
@@ -1310,7 +1169,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
String sql;
try {
- sql = H2Utils.generateFieldsQueryString(qry.getSql(),
qry.getAlias(), tblDesc);
+ sql = generateFieldsQueryString(qry.getSql(), qry.getAlias(),
tblDesc);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1343,14 +1202,14 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* @param cmd Command (native).
* @return Result.
*/
- public FieldsQueryCursor<List<?>> executeCommand(
+ private FieldsQueryCursor<List<?>> executeCommand(
String schemaName,
SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx,
QueryParserResultCommand cmd
) {
if (cmd.noOp())
- return H2Utils.zeroCursor();
+ return zeroCursor();
SqlCommand cmdNative = cmd.commandNative();
GridSqlStatement cmdH2 = cmd.commandH2();
@@ -1391,9 +1250,13 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
private void checkClusterState(QueryParserResult parseRes) {
if (!ctx.state().publicApiActiveState(true)) {
if (parseRes.isCommand()) {
- SqlCommand cmd = parseRes.command().commandNative();
+ QueryParserResultCommand cmd = parseRes.command();
+
+ assert cmd != null;
- if (cmd instanceof SqlCommitTransactionCommand || cmd
instanceof SqlRollbackTransactionCommand)
+ SqlCommand cmd0 = cmd.commandNative();
+
+ if (cmd0 instanceof SqlCommitTransactionCommand || cmd0
instanceof SqlRollbackTransactionCommand)
return;
}
@@ -1416,13 +1279,10 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
SqlFieldsQuery remainingQry = qry;
while (remainingQry != null) {
- QueryParserResult parseRes = parser.parse(schemaName,
remainingQry);
+ QueryParserResult parseRes = parser.parse(schemaName,
remainingQry, !failOnMultipleStmts);
remainingQry = parseRes.remainingQuery();
- if (remainingQry != null && failOnMultipleStmts)
- throw new IgniteSQLException("Multiple statements queries
are not supported");
-
SqlFieldsQuery newQry = parseRes.query();
assert newQry.getSql() != null;
@@ -1510,12 +1370,10 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
boolean fail = false;
try {
- Connection conn =
connMgr.connectionForThread().connection(schemaName);
-
if (!loc)
- return executeUpdateDistributed(schemaName, conn, dml ,
qry, cancel);
+ return executeUpdateDistributed(schemaName, dml , qry,
cancel);
else {
- UpdateResult updRes = executeUpdate(schemaName, conn, dml
, qry, true, filter, cancel);
+ UpdateResult updRes = executeUpdate(schemaName, dml , qry,
true, filter, cancel);
return Collections.singletonList(new QueryCursorImpl<>(new
Iterable<List<?>>() {
@SuppressWarnings("NullableProblems")
@@ -1543,6 +1401,8 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
// Distributed query.
GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
+ assert twoStepQry != null;
+
if (ctx.security().enabled())
checkSecurity(twoStepQry.cacheIds());
@@ -1624,7 +1484,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* @return {@code True} if need to start transaction.
*/
@SuppressWarnings("SimplifiableIfStatement")
- public boolean autoStartTx(SqlFieldsQuery qry) {
+ private boolean autoStartTx(SqlFieldsQuery qry) {
if (!mvccEnabled(ctx))
return false;
@@ -1672,18 +1532,17 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
loc = false;
}
- Connection conn = connMgr.connectionForThread().connection(schema);
+ QueryParserResult parseRes = parser.parse(schema, fldsQry, false);
- H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+ assert parseRes.remainingQuery() == null;
- PreparedStatement stmt = preparedStatementWithParams(conn,
fldsQry.getSql(),
- F.asList(fldsQry.getArgs()), true);
+ QueryParserResultDml dml = parseRes.dml();
- IndexingQueryFilter filter = backupFilter(topVer, parts);
+ assert dml != null;
- QueryParserResultDml dml = parser.prepareDmlStatement(stmt);
+ IndexingQueryFilter filter = backupFilter(topVer, parts);
- UpdatePlan plan = updatePlan(schema, conn, dml, fldsQry, loc);
+ UpdatePlan plan = updatePlan(schema, dml, fldsQry);
GridCacheContext planCctx = plan.cacheContext();
@@ -1904,22 +1763,15 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
GridQueryCancel cancel,
boolean loc
) throws IgniteCheckedException {
- Connection conn = connMgr.connectionForThread().connection(schemaName);
-
- H2Utils.setupConnection(conn, false, qry.isEnforceJoinOrder());
+ QueryParserResult parseRes = parser.parse(schemaName, qry, false);
- PreparedStatement stmt = preparedStatementWithParams(conn,
qry.getSql(), Arrays.asList(qry.getArgs()), true);
+ assert parseRes.remainingQuery() == null;
- Connection c;
+ QueryParserResultDml dml = parseRes.dml();
- try {
- c = stmt.getConnection();
- }
- catch (SQLException e) {
- throw new IgniteCheckedException(e);
- }
+ assert dml != null;
- return executeUpdate(schemaName, c, parser.prepareDmlStatement(stmt),
qry, loc, filter, cancel);
+ return executeUpdate(schemaName, dml, qry, loc, filter, cancel);
}
/**
@@ -1934,7 +1786,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
*/
@Override public boolean registerType(GridCacheContextInfo cacheInfo,
GridQueryTypeDescriptor type, boolean isSql)
throws IgniteCheckedException {
- H2Utils.validateTypeDescriptor(type);
+ validateTypeDescriptor(type);
schemaMgr.onCacheTypeCreated(cacheInfo, this, type, isSql);
return true;
@@ -2116,8 +1968,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
schemaMgr = new SchemaManager(ctx, connMgr);
schemaMgr.start(ctx.config().getSqlSchemas());
- valCtx = new CacheQueryObjectValueContext(ctx);
-
nodeId = ctx.localNodeId();
marshaller = ctx.config().getMarshaller();
@@ -2459,7 +2309,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
/**
* @param schemaName Schema.
- * @param conn Connection.
* @param dml DML statement.
* @param fieldsQry Initial query
* @param cancel Query cancel.
@@ -2469,7 +2318,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
@SuppressWarnings("unchecked")
private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
String schemaName,
- Connection conn,
QueryParserResultDml dml,
SqlFieldsQuery fieldsQry,
GridQueryCancel cancel
@@ -2481,7 +2329,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
List<Object[]> argss = fieldsQry0.batchedArguments();
- UpdatePlan plan = updatePlan(schemaName, conn, dml, fieldsQry0,
false);
+ UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry0);
GridCacheContext<?, ?> cctx = plan.cacheContext();
@@ -2517,7 +2365,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
UpdateResult res;
try {
- res = executeUpdate(schemaName, conn, dml, qry0,
false, null, cancel);
+ res = executeUpdate(schemaName, dml, qry0, false,
null, cancel);
cntPerRow[cntr++] = (int)res.counter();
@@ -2556,7 +2404,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
return resCurs;
}
else {
- UpdateResult res = executeUpdate(schemaName, conn, dml, fieldsQry,
false, null, cancel);
+ UpdateResult res = executeUpdate(schemaName, dml, fieldsQry,
false, null, cancel);
res.throwIfError();
@@ -2573,7 +2421,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* Execute DML statement, possibly with few re-attempts in case of
concurrent data modifications.
*
* @param schemaName Schema.
- * @param conn Connection.
* @param dml DML command.
* @param fieldsQry Original query.
* @param loc Query locality flag.
@@ -2582,14 +2429,19 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
- private UpdateResult executeUpdate(String schemaName, Connection conn,
QueryParserResultDml dml,
- SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
GridQueryCancel cancel)
- throws IgniteCheckedException {
+ private UpdateResult executeUpdate(
+ String schemaName,
+ QueryParserResultDml dml,
+ SqlFieldsQuery fieldsQry,
+ boolean loc,
+ IndexingQueryFilter filters,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
- UpdatePlan plan = updatePlan(schemaName, conn, dml, fieldsQry, loc);
+ UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry);
GridCacheContext<?, ?> cctx = plan.cacheContext();
@@ -2645,12 +2497,12 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
) throws IgniteCheckedException {
GridCacheContext cctx = plan.cacheContext();
+ DmlDistributedPlanInfo distributedPlan = loc ? null :
plan.distributedPlan();
+
if (cctx != null && cctx.mvccEnabled()) {
assert cctx.transactional();
- DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
- GridNearTxLocal tx = tx(cctx.kernalContext());
+ GridNearTxLocal tx = tx(ctx);
boolean implicit = (tx == null);
@@ -2660,7 +2512,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
if (implicit)
tx = txStart(cctx, fieldsQry.getTimeout());
- requestSnapshot(cctx, tx);
+ requestSnapshot(tx);
try (GridNearTxLocal toCommit = commit ? tx : null) {
long timeout = implicit
@@ -2758,7 +2610,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
if (ex instanceof IgniteException)
throw (IgniteException)ex;
- U.error(log, "Error during update [localNodeId=" +
cctx.localNodeId() + "]", ex);
+ U.error(log, "Error during update [localNodeId=" +
ctx.localNodeId() + "]", ex);
throw new IgniteSQLException("Failed to run update. " +
ex.getMessage(), ex);
}
@@ -2773,16 +2625,22 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
if (fastUpdateRes != null)
return fastUpdateRes;
- if (plan.distributedPlan() != null) {
- DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
- assert distributedPlan != null;
-
+ if (distributedPlan != null) {
if (cancel == null)
cancel = new GridQueryCancel();
- UpdateResult result = runDistributedUpdate(schemaName, fieldsQry,
distributedPlan.getCacheIds(),
- distributedPlan.isReplicatedOnly(), cancel);
+ UpdateResult result = rdcQryExec.update(
+ schemaName,
+ distributedPlan.getCacheIds(),
+ fieldsQry.getSql(),
+ fieldsQry.getArgs(),
+ fieldsQry.isEnforceJoinOrder(),
+ fieldsQry.getPageSize(),
+ fieldsQry.getTimeout(),
+ fieldsQry.getPartitions(),
+ distributedPlan.isReplicatedOnly(),
+ cancel
+ );
// null is returned in case not all nodes support distributed DML.
if (result != null)
@@ -2845,24 +2703,20 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* Generate SELECT statements to retrieve data for modifications from and
find fast UPDATE or DELETE args,
* if available.
*
- * @param schema Schema.
- * @param conn Connection.
+ * @param schemaName Schema.
* @param dml Command.
* @param fieldsQry Original fields query.
- * @param loc Local query flag.
* @return Update plan.
*/
@SuppressWarnings("IfMayBeConditional")
private UpdatePlan updatePlan(
- String schema,
- Connection conn,
+ String schemaName,
QueryParserResultDml dml,
- SqlFieldsQuery fieldsQry,
- boolean loc
+ SqlFieldsQuery fieldsQry
) throws IgniteCheckedException {
// Disallow updates on SYSTEM schema.
- if (F.eq(QueryUtils.SCHEMA_SYS, schema))
- throw new IgniteSQLException("DML statements are not supported on
" + schema + " schema",
+ if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
+ throw new IgniteSQLException("DML statements are not supported on
" + schemaName + " schema",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
// Disallow updates inside transaction if this is not MVCC mode.
@@ -2875,7 +2729,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
"\"IGNITE_ALLOW_DML_INSIDE_TRANSACTION\"");
}
- H2CachedStatementKey planKey = new H2CachedStatementKey(schema,
dml.statement().getSQL(), fieldsQry, loc);
+ H2CachedStatementKey planKey = new H2CachedStatementKey(schemaName,
dml.statement().getSQL(), fieldsQry);
UpdatePlan res = updatePlanCache.get(planKey);
@@ -2883,11 +2737,10 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
return res;
res = UpdatePlanBuilder.planForStatement(
+ schemaName,
dml.statement(),
dml.mvccEnabled(),
- loc,
this,
- conn,
fieldsQry
);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
deleted file mode 100644
index 50dd892..0000000
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementEx.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface PreparedStatementEx extends PreparedStatement {
- /** */
- static final AtomicInteger metaIdGenerator = new AtomicInteger();
-
- /** Flag if at least one MVCC cache is used in this statement. */
- static final int MVCC_STATE = metaIdGenerator.getAndIncrement();
-
- /** First mvcc cache id of the involved caches. */
- static final int MVCC_CACHE_ID = metaIdGenerator.getAndIncrement();
-
- /**
- * @param id Metadata key.
- * @return Attached metadata.
- */
- @Nullable <T> T meta(int id);
-
- /**
- * @param id Metadata key.
- * @param metaObj Metadata object.
- */
- void putMeta(int id, Object metaObj);
-}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
deleted file mode 100644
index e98e537..0000000
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.io.InputStream;
-import java.io.Reader;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.NClob;
-import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
-import java.sql.Ref;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.RowId;
-import java.sql.SQLException;
-import java.sql.SQLType;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Calendar;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * PreparedStatement with extended capability to store additional meta
information.
- */
-@SuppressWarnings("unchecked")
-final class PreparedStatementExImpl implements PreparedStatementEx {
- /** */
- private final PreparedStatement delegate;
-
- /** */
- private Object[] meta = null;
-
- /**
- * @param delegate Wrapped statement.
- */
- public PreparedStatementExImpl(PreparedStatement delegate) {
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public ResultSet executeQuery() throws SQLException {
- return delegate.executeQuery();
- }
-
- /** {@inheritDoc} */
- @Override public int executeUpdate() throws SQLException {
- return delegate.executeUpdate();
- }
-
- /** {@inheritDoc} */
- @Override public void setNull(int parameterIndex, int sqlType) throws
SQLException {
- delegate.setNull(parameterIndex, sqlType);
- }
-
- /** {@inheritDoc} */
- @Override public void setBoolean(int parameterIndex, boolean x) throws
SQLException {
- delegate.setBoolean(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setByte(int parameterIndex, byte x) throws
SQLException {
- delegate.setByte(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setShort(int parameterIndex, short x) throws
SQLException {
- delegate.setShort(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setInt(int parameterIndex, int x) throws
SQLException {
- delegate.setInt(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setLong(int parameterIndex, long x) throws
SQLException {
- delegate.setLong(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setFloat(int parameterIndex, float x) throws
SQLException {
- delegate.setFloat(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setDouble(int parameterIndex, double x) throws
SQLException {
- delegate.setDouble(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setBigDecimal(int parameterIndex, BigDecimal x)
throws SQLException {
- delegate.setBigDecimal(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setString(int parameterIndex, String x) throws
SQLException {
- delegate.setString(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setBytes(int parameterIndex, byte[] x) throws
SQLException {
- delegate.setBytes(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setDate(int parameterIndex, Date x) throws
SQLException {
- delegate.setDate(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setTime(int parameterIndex, Time x) throws
SQLException {
- delegate.setTime(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setTimestamp(int parameterIndex, Timestamp x) throws
SQLException {
- delegate.setTimestamp(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setAsciiStream(int parameterIndex, InputStream x,
int length) throws SQLException {
- delegate.setAsciiStream(parameterIndex, x, length);
- }
-
- /** {@inheritDoc} */
- @Deprecated
- @Override public void setUnicodeStream(int parameterIndex, InputStream x,
int length) throws SQLException {
- delegate.setUnicodeStream(parameterIndex, x, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setBinaryStream(int parameterIndex, InputStream x,
int length) throws SQLException {
- delegate.setBinaryStream(parameterIndex, x, length);
- }
-
- /** {@inheritDoc} */
- @Override public void clearParameters() throws SQLException {
- delegate.clearParameters();
- }
-
- /** {@inheritDoc} */
- @Override public void setObject(int parameterIndex, Object x, int
targetSqlType) throws SQLException {
- delegate.setObject(parameterIndex, x, targetSqlType);
- }
-
- /** {@inheritDoc} */
- @Override public void setObject(int parameterIndex, Object x) throws
SQLException {
- delegate.setObject(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public boolean execute() throws SQLException {
- return delegate.execute();
- }
-
- /** {@inheritDoc} */
- @Override public void addBatch() throws SQLException {
- delegate.addBatch();
- }
-
- /** {@inheritDoc} */
- @Override public void setCharacterStream(int parameterIndex, Reader
reader, int length) throws SQLException {
- delegate.setCharacterStream(parameterIndex, reader, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setRef(int parameterIndex, Ref x) throws
SQLException {
- delegate.setRef(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setBlob(int parameterIndex, Blob x) throws
SQLException {
- delegate.setBlob(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setClob(int parameterIndex, Clob x) throws
SQLException {
- delegate.setClob(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setArray(int parameterIndex, Array x) throws
SQLException {
- delegate.setArray(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public ResultSetMetaData getMetaData() throws SQLException {
- return delegate.getMetaData();
- }
-
- /** {@inheritDoc} */
- @Override public void setDate(int parameterIndex, Date x, Calendar cal)
throws SQLException {
- delegate.setDate(parameterIndex, x, cal);
- }
-
- /** {@inheritDoc} */
- @Override public void setTime(int parameterIndex, Time x, Calendar cal)
throws SQLException {
- delegate.setTime(parameterIndex, x, cal);
- }
-
- /** {@inheritDoc} */
- @Override public void setTimestamp(int parameterIndex, Timestamp x,
Calendar cal) throws SQLException {
- delegate.setTimestamp(parameterIndex, x, cal);
- }
-
- /** {@inheritDoc} */
- @Override public void setNull(int parameterIndex, int sqlType, String
typeName) throws SQLException {
- delegate.setNull(parameterIndex, sqlType, typeName);
- }
-
- /** {@inheritDoc} */
- @Override public void setURL(int parameterIndex, URL x) throws
SQLException {
- delegate.setURL(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public ParameterMetaData getParameterMetaData() throws
SQLException {
- return delegate.getParameterMetaData();
- }
-
- /** {@inheritDoc} */
- @Override public void setRowId(int parameterIndex, RowId x) throws
SQLException {
- delegate.setRowId(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setNString(int parameterIndex, String value) throws
SQLException {
- delegate.setNString(parameterIndex, value);
- }
-
- /** {@inheritDoc} */
- @Override public void setNCharacterStream(int parameterIndex, Reader
value, long length) throws SQLException {
- delegate.setNCharacterStream(parameterIndex, value, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setNClob(int parameterIndex, NClob value) throws
SQLException {
- delegate.setNClob(parameterIndex, value);
- }
-
- /** {@inheritDoc} */
- @Override public void setClob(int parameterIndex, Reader reader, long
length) throws SQLException {
- delegate.setClob(parameterIndex, reader, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setBlob(int parameterIndex, InputStream inputStream,
long length) throws SQLException {
- delegate.setBlob(parameterIndex, inputStream, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setNClob(int parameterIndex, Reader reader, long
length) throws SQLException {
- delegate.setNClob(parameterIndex, reader, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject)
throws SQLException {
- delegate.setSQLXML(parameterIndex, xmlObject);
- }
-
- /** {@inheritDoc} */
- @Override public void setObject(int parameterIndex, Object x, int
targetSqlType, int scaleOrLength) throws SQLException {
- delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
- }
-
- /** {@inheritDoc} */
- @Override public void setAsciiStream(int parameterIndex, InputStream x,
long length) throws SQLException {
- delegate.setAsciiStream(parameterIndex, x, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setBinaryStream(int parameterIndex, InputStream x,
long length) throws SQLException {
- delegate.setBinaryStream(parameterIndex, x, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setCharacterStream(int parameterIndex, Reader
reader, long length) throws SQLException {
- delegate.setCharacterStream(parameterIndex, reader, length);
- }
-
- /** {@inheritDoc} */
- @Override public void setAsciiStream(int parameterIndex, InputStream x)
throws SQLException {
- delegate.setAsciiStream(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setBinaryStream(int parameterIndex, InputStream x)
throws SQLException {
- delegate.setBinaryStream(parameterIndex, x);
- }
-
- /** {@inheritDoc} */
- @Override public void setCharacterStream(int parameterIndex, Reader
reader) throws SQLException {
- delegate.setCharacterStream(parameterIndex, reader);
- }
-
- /** {@inheritDoc} */
- @Override public void setNCharacterStream(int parameterIndex, Reader
value) throws SQLException {
- delegate.setNCharacterStream(parameterIndex, value);
- }
-
- /** {@inheritDoc} */
- @Override public void setClob(int parameterIndex, Reader reader) throws
SQLException {
- delegate.setClob(parameterIndex, reader);
- }
-
- /** {@inheritDoc} */
- @Override public void setBlob(int parameterIndex, InputStream inputStream)
throws SQLException {
- delegate.setBlob(parameterIndex, inputStream);
- }
-
- /** {@inheritDoc} */
- @Override public void setNClob(int parameterIndex, Reader reader) throws
SQLException {
- delegate.setNClob(parameterIndex, reader);
- }
-
- /** {@inheritDoc} */
- @Override public void setObject(int parameterIndex, Object x, SQLType
targetSqlType, int scaleOrLength) throws SQLException {
- delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
- }
-
- /** {@inheritDoc} */
- @Override public void setObject(int parameterIndex, Object x, SQLType
targetSqlType) throws SQLException {
- delegate.setObject(parameterIndex, x, targetSqlType);
- }
-
- /** {@inheritDoc} */
- @Override public long executeLargeUpdate() throws SQLException {
- return delegate.executeLargeUpdate();
- }
-
- /** {@inheritDoc} */
- @Override public ResultSet executeQuery(String sql) throws SQLException {
- return delegate.executeQuery(sql);
- }
-
- /** {@inheritDoc} */
- @Override public int executeUpdate(String sql) throws SQLException {
- return delegate.executeUpdate(sql);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws SQLException {
- delegate.close();
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxFieldSize() throws SQLException {
- return delegate.getMaxFieldSize();
- }
-
- /** {@inheritDoc} */
- @Override public void setMaxFieldSize(int max) throws SQLException {
- delegate.setMaxFieldSize(max);
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxRows() throws SQLException {
- return delegate.getMaxRows();
- }
-
- /** {@inheritDoc} */
- @Override public void setMaxRows(int max) throws SQLException {
- delegate.setMaxRows(max);
- }
-
- /** {@inheritDoc} */
- @Override public void setEscapeProcessing(boolean enable) throws
SQLException {
- delegate.setEscapeProcessing(enable);
- }
-
- /** {@inheritDoc} */
- @Override public int getQueryTimeout() throws SQLException {
- return delegate.getQueryTimeout();
- }
-
- /** {@inheritDoc} */
- @Override public void setQueryTimeout(int seconds) throws SQLException {
- delegate.setQueryTimeout(seconds);
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() throws SQLException {
- delegate.cancel();
- }
-
- /** {@inheritDoc} */
- @Override public SQLWarning getWarnings() throws SQLException {
- return delegate.getWarnings();
- }
-
- /** {@inheritDoc} */
- @Override public void clearWarnings() throws SQLException {
- delegate.clearWarnings();
- }
-
- /** {@inheritDoc} */
- @Override public void setCursorName(String name) throws SQLException {
- delegate.setCursorName(name);
- }
-
- /** {@inheritDoc} */
- @Override public boolean execute(String sql) throws SQLException {
- return delegate.execute(sql);
- }
-
- /** {@inheritDoc} */
- @Override public ResultSet getResultSet() throws SQLException {
- return delegate.getResultSet();
- }
-
- /** {@inheritDoc} */
- @Override public int getUpdateCount() throws SQLException {
- return delegate.getUpdateCount();
- }
-
- /** {@inheritDoc} */
- @Override public boolean getMoreResults() throws SQLException {
- return delegate.getMoreResults();
- }
-
- /** {@inheritDoc} */
- @Override public int getFetchDirection() throws SQLException {
- return delegate.getFetchDirection();
- }
-
- /** {@inheritDoc} */
- @Override public void setFetchDirection(int direction) throws SQLException
{
- delegate.setFetchDirection(direction);
- }
-
- /** {@inheritDoc} */
- @Override public int getFetchSize() throws SQLException {
- return delegate.getFetchSize();
- }
-
- /** {@inheritDoc} */
- @Override public void setFetchSize(int rows) throws SQLException {
- delegate.setFetchSize(rows);
- }
-
- /** {@inheritDoc} */
- @Override public int getResultSetConcurrency() throws SQLException {
- return delegate.getResultSetConcurrency();
- }
-
- /** {@inheritDoc} */
- @Override public int getResultSetType() throws SQLException {
- return delegate.getResultSetType();
- }
-
- /** {@inheritDoc} */
- @Override public void addBatch(String sql) throws SQLException {
- delegate.addBatch(sql);
- }
-
- /** {@inheritDoc} */
- @Override public void clearBatch() throws SQLException {
- delegate.clearBatch();
- }
-
- /** {@inheritDoc} */
- @Override public int[] executeBatch() throws SQLException {
- return delegate.executeBatch();
- }
-
- /** {@inheritDoc} */
- @Override public Connection getConnection() throws SQLException {
- return delegate.getConnection();
- }
-
- /** {@inheritDoc} */
- @Override public boolean getMoreResults(int current) throws SQLException {
- return delegate.getMoreResults(current);
- }
-
- /** {@inheritDoc} */
- @Override public ResultSet getGeneratedKeys() throws SQLException {
- return delegate.getGeneratedKeys();
- }
-
- /** {@inheritDoc} */
- @Override public int executeUpdate(String sql, int autoGeneratedKeys)
throws SQLException {
- return delegate.executeUpdate(sql, autoGeneratedKeys);
- }
-
- /** {@inheritDoc} */
- @Override public int executeUpdate(String sql, int[] columnIndexes) throws
SQLException {
- return delegate.executeUpdate(sql, columnIndexes);
- }
-
- /** {@inheritDoc} */
- @Override public int executeUpdate(String sql, String[] columnNames)
throws SQLException {
- return delegate.executeUpdate(sql, columnNames);
- }
-
- /** {@inheritDoc} */
- @Override public boolean execute(String sql, int autoGeneratedKeys) throws
SQLException {
- return delegate.execute(sql, autoGeneratedKeys);
- }
-
- /** {@inheritDoc} */
- @Override public boolean execute(String sql, int[] columnIndexes) throws
SQLException {
- return delegate.execute(sql, columnIndexes);
- }
-
- /** {@inheritDoc} */
- @Override public boolean execute(String sql, String[] columnNames) throws
SQLException {
- return delegate.execute(sql, columnNames);
- }
-
- /** {@inheritDoc} */
- @Override public int getResultSetHoldability() throws SQLException {
- return delegate.getResultSetHoldability();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isClosed() throws SQLException {
- return delegate.isClosed();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPoolable() throws SQLException {
- return delegate.isPoolable();
- }
-
- /** {@inheritDoc} */
- @Override public void setPoolable(boolean poolable) throws SQLException {
- delegate.setPoolable(poolable);
- }
-
- /** {@inheritDoc} */
- @Override public void closeOnCompletion() throws SQLException {
- delegate.closeOnCompletion();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isCloseOnCompletion() throws SQLException {
- return delegate.isCloseOnCompletion();
- }
-
- /** {@inheritDoc} */
- @Override public long getLargeUpdateCount() throws SQLException {
- return delegate.getLargeUpdateCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getLargeMaxRows() throws SQLException {
- return delegate.getLargeMaxRows();
- }
-
- /** {@inheritDoc} */
- @Override public void setLargeMaxRows(long max) throws SQLException {
- delegate.setLargeMaxRows(max);
- }
-
- /** {@inheritDoc} */
- @Override public long[] executeLargeBatch() throws SQLException {
- return delegate.executeLargeBatch();
- }
-
- /** {@inheritDoc} */
- @Override public long executeLargeUpdate(String sql) throws SQLException {
- return delegate.executeLargeUpdate(sql);
- }
-
- /** {@inheritDoc} */
- @Override public long executeLargeUpdate(String sql, int
autoGeneratedKeys) throws SQLException {
- return delegate.executeLargeUpdate(sql, autoGeneratedKeys);
- }
-
- /** {@inheritDoc} */
- @Override public long executeLargeUpdate(String sql, int[] columnIndexes)
throws SQLException {
- return delegate.executeLargeUpdate(sql, columnIndexes);
- }
-
- /** {@inheritDoc} */
- @Override public long executeLargeUpdate(String sql, String[] columnNames)
throws SQLException {
- return delegate.executeLargeUpdate(sql, columnNames);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <T> T unwrap(Class<T> iface) throws SQLException {
- if (iface == PreparedStatementExImpl.class || iface ==
PreparedStatementEx.class)
- return (T)this;
-
- return delegate.unwrap(iface);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return iface == PreparedStatementExImpl.class
- || iface == PreparedStatementEx.class
- || delegate.isWrapperFor(iface);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public <T> T meta(int id) {
- return meta != null && id < meta.length ? (T)meta[id] : null;
- }
-
- /** {@inheritDoc} */
- @Override public void putMeta(int id, Object metaObj) {
- if (meta == null)
- meta = new Object[id + 1];
- else if (meta.length <= id)
- meta = Arrays.copyOf(meta, id + 1);
-
- meta[id] = metaObj;
- }
-
- /**
- *
- * @param stmt Prepared statement to wrap.
- * @return Wrapped statement.
- */
- public static PreparedStatement wrap(@NotNull PreparedStatement stmt) {
- if (stmt.getClass() == PreparedStatementExImpl.class)
- return stmt;
-
- return new PreparedStatementExImpl(stmt);
- }
-}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 13810cb..029a77c 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.h2;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -31,11 +30,16 @@ import
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
import org.apache.ignite.internal.sql.SqlParseException;
import org.apache.ignite.internal.sql.SqlParser;
import org.apache.ignite.internal.sql.SqlStrictParseException;
@@ -62,6 +66,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
/**
@@ -72,7 +77,7 @@ public class QueryParser {
private static final int CACHE_SIZE = 1024;
/** A pattern for commands having internal implementation in Ignite. */
- public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
+ private static final Pattern INTERNAL_CMD_RE = Pattern.compile(
"^(create|drop)\\s+index|^alter\\s+table|^copy|^set|^begin|^commit|^rollback|^(create|alter|drop)\\s+user",
Pattern.CASE_INSENSITIVE);
@@ -107,10 +112,11 @@ public class QueryParser {
*
* @param schemaName schema name.
* @param qry query to parse.
+ * @param remainingAllowed Whether multiple statements are allowed.
* @return Parsing result that contains Parsed leading query and remaining
sql script.
*/
- public QueryParserResult parse(String schemaName, SqlFieldsQuery qry) {
- QueryParserResult res = parse0(schemaName, qry);
+ public QueryParserResult parse(String schemaName, SqlFieldsQuery qry,
boolean remainingAllowed) {
+ QueryParserResult res = parse0(schemaName, qry, remainingAllowed);
checkQueryType(qry, res.isSelect());
@@ -122,9 +128,10 @@ public class QueryParser {
*
* @param schemaName schema name.
* @param qry query to parse.
+ * @param remainingAllowed Whether multiple statements are allowed.
* @return Parsing result that contains Parsed leading query and remaining
sql script.
*/
- private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry) {
+ private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry,
boolean remainingAllowed) {
// First, let's check if we already have a two-step query for this
statement...
QueryParserCacheKey cachedKey = new QueryParserCacheKey(
schemaName,
@@ -137,15 +144,15 @@ public class QueryParser {
QueryParserCacheEntry cached = cache.get(cachedKey);
- if (cached != null)
+ if (cached != null)
return new QueryParserResult(qry, null, cached.select(),
cached.dml(), cached.command());
// Try parting as native command.
- QueryParserResult parseRes = parseNative(schemaName, qry);
+ QueryParserResult parseRes = parseNative(schemaName, qry,
remainingAllowed);
// Otherwise parse with H2.
if (parseRes == null)
- parseRes = parseH2(schemaName, qry);
+ parseRes = parseH2(schemaName, qry, remainingAllowed);
// Add to cache if not multi-statement.
if (parseRes.remainingQuery() == null) {
@@ -164,11 +171,12 @@ public class QueryParser {
*
* @param schemaName Schema name.
* @param qry which sql text to parse.
+ * @param remainingAllowed Whether multiple statements are allowed.
* @return Command or {@code null} if cannot parse this query.
*/
@SuppressWarnings("IfMayBeConditional")
@Nullable
- private QueryParserResult parseNative(String schemaName, SqlFieldsQuery
qry) {
+ private QueryParserResult parseNative(String schemaName, SqlFieldsQuery
qry, boolean remainingAllowed) {
String sql = qry.getSql();
// Heuristic check for fast return.
@@ -198,12 +206,13 @@ public class QueryParser {
SqlFieldsQuery newQry =
cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
- SqlFieldsQuery remainingQry;
-
- if (F.isEmpty(parser.remainingSql()))
- remainingQry = null;
- else
+ SqlFieldsQuery remainingQry = null;
+
+ if (!F.isEmpty(parser.remainingSql())) {
+ checkRemainingAllowed(remainingAllowed);
+
remainingQry =
cloneFieldsQuery(qry).setSql(parser.remainingSql()).setArgs(qry.getArgs());
+ }
QueryParserResultCommand cmd = new
QueryParserResultCommand(nativeCmd, null, false);
@@ -234,10 +243,11 @@ public class QueryParser {
*
* @param schemaName Schema name.
* @param qry Query.
+ * @param remainingAllowed Whether multiple statements are allowed.
* @return Parsing result.
*/
@SuppressWarnings("IfMayBeConditional")
- private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry) {
+ private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry,
boolean remainingAllowed) {
Connection c = connMgr.connectionForThread().connection(schemaName);
// For queries that are explicitly local, we rely on the flag
specified in the query
@@ -258,151 +268,208 @@ public class QueryParser {
IgniteQueryErrorCode.PARSING, e);
}
- if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
- throw new IgniteSQLException("Multiple statements queries are not
supported for local queries.",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
- GridSqlQueryParser.PreparedWithRemaining prep =
GridSqlQueryParser.preparedWithRemaining(stmt);
-
- Prepared prepared = prep.prepared();
-
- if (GridSqlQueryParser.isExplainUpdate(prepared))
- throw new IgniteSQLException("Explains of update queries are not
supported.",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ try {
+ if (qry.isLocal() &&
GridSqlQueryParser.checkMultipleStatements(stmt))
+ throw new IgniteSQLException("Multiple statements queries are
not supported for local queries.",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- int paramsCnt = prepared.getParameters().size();
+ GridSqlQueryParser.PreparedWithRemaining prep =
GridSqlQueryParser.preparedWithRemaining(stmt);
- Object[] argsOrig = qry.getArgs();
+ Prepared prepared = prep.prepared();
- Object[] args = null;
- Object[] remainingArgs = null;
+ if (GridSqlQueryParser.isExplainUpdate(prepared))
+ throw new IgniteSQLException("Explains of update queries are
not supported.",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
- if (argsOrig == null || argsOrig.length < paramsCnt) {
- throw new IgniteException("Invalid number of query parameters.
" +
- "Cannot find " + (argsOrig != null ? argsOrig.length + 1 :
1) + " parameter.");
+ // Get remaining query and check if it is allowed.
+ SqlFieldsQuery remainingQry = null;
+
+ if (!F.isEmpty(prep.remainingSql())) {
+ checkRemainingAllowed(remainingAllowed);
+
+ remainingQry =
cloneFieldsQuery(qry).setSql(prep.remainingSql());
}
- args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
+ // Prepare new query.
+ SqlFieldsQuery newQry =
cloneFieldsQuery(qry).setSql(prepared.getSQL());
- if (paramsCnt != argsOrig.length)
- remainingArgs = Arrays.copyOfRange(argsOrig, paramsCnt,
argsOrig.length);
- }
- else
- remainingArgs = argsOrig;
+ int paramsCnt = prepared.getParameters().size();
- SqlFieldsQuery remainingQry;
+ Object[] argsOrig = qry.getArgs();
- if (F.isEmpty(prep.remainingSql()))
- remainingQry = null;
- else
- remainingQry =
cloneFieldsQuery(qry).setSql(prep.remainingSql()).setArgs(remainingArgs);
+ Object[] args = null;
+ Object[] remainingArgs = null;
- SqlFieldsQuery newQry =
cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
+ if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
+ if (argsOrig == null || argsOrig.length < paramsCnt)
+ // Not enough parameters, but we will handle this later on
execution phase.
+ args = argsOrig;
+ else {
+ args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
- if (CommandProcessor.isCommand(prepared)) {
- GridSqlStatement cmdH2 = new
GridSqlQueryParser(false).parse(prepared);
+ if (paramsCnt != argsOrig.length)
+ remainingArgs = Arrays.copyOfRange(argsOrig,
paramsCnt, argsOrig.length);
+ }
+ }
+ else
+ remainingArgs = argsOrig;
- QueryParserResultCommand cmd = new QueryParserResultCommand(null,
cmdH2, false);
+ newQry.setArgs(args);
- return new QueryParserResult(newQry, remainingQry, null, null,
cmd);
- }
- else if (CommandProcessor.isCommandNoOp(prepared)) {
- QueryParserResultCommand cmd = new QueryParserResultCommand(null,
null, true);
+ if (remainingQry != null)
+ remainingQry.setArgs(remainingArgs);
- return new QueryParserResult(newQry, remainingQry, null, null,
cmd);
- }
- else if (GridSqlQueryParser.isDml(prepared)) {
- QueryParserResultDml dml = prepareDmlStatement(prepared);
-
- return new QueryParserResult(newQry, remainingQry, null, dml,
null);
- }
- else if (!prepared.isQuery()) {
- throw new IgniteSQLException("Unsupported statement: " +
newQry.getSql(),
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- // Parse SELECT. Split is required either if query is distirubted, or
when it is local, but executed
- // over segmented PARTITIONED case. In this case multiple map queries
will be executed against local
- // node stripes in parallel and then merged through reduce process.
+ // Do actual parsing.
+ if (CommandProcessor.isCommand(prepared)) {
+ GridSqlStatement cmdH2 = new
GridSqlQueryParser(false).parse(prepared);
+ QueryParserResultCommand cmd = new
QueryParserResultCommand(null, cmdH2, false);
- // Calculate if query is in fact can be executed locally.
- boolean loc = qry.isLocal();
+ return new QueryParserResult(newQry, remainingQry, null, null,
cmd);
+ }
+ else if (CommandProcessor.isCommandNoOp(prepared)) {
+ QueryParserResultCommand cmd = new
QueryParserResultCommand(null, null, true);
- GridSqlQueryParser parser = null;
+ return new QueryParserResult(newQry, remainingQry, null, null,
cmd);
+ }
+ else if (GridSqlQueryParser.isDml(prepared)) {
+ QueryParserResultDml dml = prepareDmlStatement(prepared);
- if (!loc) {
- parser = new GridSqlQueryParser(false);
+ return new QueryParserResult(newQry, remainingQry, null, dml,
null);
+ }
+ else if (!prepared.isQuery()) {
+ throw new IgniteSQLException("Unsupported statement: " +
newQry.getSql(),
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
- parser.parse(prepared);
+ // Parse SELECT.
+ GridSqlQueryParser parser = new GridSqlQueryParser(false);
- if (parser.isLocalQuery())
- loc = true;
- }
+ GridSqlStatement stmt0 = parser.parse(prepared);
- // If this is a local query, check if it must be split.
- boolean locSplit = false;
+ List<Integer> cacheIds = parser.cacheIds();
+ Integer mvccCacheId = mvccCacheIdForSelect(parser.objectsMap());
- if (loc) {
- if (parser == null) {
- parser = new GridSqlQueryParser(false);
+ // Calculate if query is in fact can be executed locally.
+ boolean loc = qry.isLocal();
- parser.parse(prepared);
+ if (!loc) {
+ if (parser.isLocalQuery())
+ loc = true;
}
- GridCacheContext cctx = parser.getFirstPartitionedCache();
-
- if (cctx != null && cctx.config().getQueryParallelism() > 1)
- locSplit = true;
- }
+ // If this is a local query, check if it must be split.
+ boolean locSplit = false;
- boolean splitNeeded = !loc || locSplit;
+ if (loc) {
+ GridCacheContext cctx = parser.getFirstPartitionedCache();
- try {
- GridCacheTwoStepQuery twoStepQry = null;
-
- if (splitNeeded) {
- twoStepQry = GridSqlQuerySplitter.split(
-
connMgr.connectionForThread().connection(newQry.getSchema()),
- prepared,
- newQry.getArgs(),
- newQry.isCollocated(),
- newQry.isDistributedJoins(),
- newQry.isEnforceJoinOrder(),
- locSplit,
- idx
- );
+ if (cctx != null && cctx.config().getQueryParallelism() > 1)
+ locSplit = true;
}
- List<GridQueryFieldMetadata> meta =
H2Utils.meta(stmt.getMetaData());
-
- QueryParserResultSelect select = new
QueryParserResultSelect(twoStepQry, locSplit, meta);
+ // Split is required either if query is distributed, or when it is
local, but executed
+ // over segmented PARTITIONED case. In this case multiple map
queries will be executed against local
+ // node stripes in parallel and then merged through reduce process.
+ boolean splitNeeded = !loc || locSplit;
+
+ try {
+ GridCacheTwoStepQuery twoStepQry = null;
+
+ if (splitNeeded) {
+ twoStepQry = GridSqlQuerySplitter.split(
+
connMgr.connectionForThread().connection(newQry.getSchema()),
+ prepared,
+ newQry.getArgs(),
+ newQry.isCollocated(),
+ newQry.isDistributedJoins(),
+ newQry.isEnforceJoinOrder(),
+ locSplit,
+ idx
+ );
+ }
+
+ List<GridQueryFieldMetadata> meta =
H2Utils.meta(stmt.getMetaData());
+
+ boolean forUpdate =
GridSqlQueryParser.isForUpdateQuery(prepared);
+
+ QueryParserResultSelect select = new QueryParserResultSelect(
+ stmt0,
+ twoStepQry,
+ meta,
+ cacheIds,
+ mvccCacheId,
+ forUpdate
+ );
- return new QueryParserResult(newQry, remainingQry, select, null,
null);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSQLException("Failed to parse query: " +
newQry.getSql(), IgniteQueryErrorCode.PARSING, e);
- }
- catch (SQLException e) {
- throw new IgniteSQLException(e);
+ return new QueryParserResult(newQry, remainingQry, select,
null, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSQLException("Failed to parse query: " +
newQry.getSql(), IgniteQueryErrorCode.PARSING,
+ e);
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException(e);
+ }
}
finally {
- // TODO: Leak if returned earlier. Will be fixed in
https://issues.apache.org/jira/browse/IGNITE-11279
U.close(stmt, log);
}
}
/**
- * Prepare DML statement.
+ * Throw exception is multiple statements are not allowed.
+ *
+ * @param allowed Whether multiple statements are allowed.
+ */
+ private static void checkRemainingAllowed(boolean allowed) {
+ if (allowed)
+ return;
+
+ throw new IgniteSQLException("Multiple statements queries are not
supported.",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+
+ /**
+ * Get ID of the first MVCC cache for SELECT.
*
- * @param preparedStmt Prepared statement.
- * @return Statement.
+ * @param objMap Object map.
+ * @return ID of the first MVCC cache or {@code null} if no MVCC caches
involved.
*/
- public QueryParserResultDml prepareDmlStatement(PreparedStatement
preparedStmt) {
- Prepared prep = GridSqlQueryParser.prepared(preparedStmt);
+ private Integer mvccCacheIdForSelect(Map<Object, Object> objMap) {
+ Boolean mvccEnabled = null;
+ Integer mvccCacheId = null;
+ GridCacheContextInfo cctx = null;
- return prepareDmlStatement(prep);
+ for (Object o : objMap.values()) {
+ if (o instanceof GridSqlAlias)
+ o = GridSqlAlias.unwrap((GridSqlAst)o);
+ if (o instanceof GridSqlTable && ((GridSqlTable)o).dataTable() !=
null) {
+ GridSqlTable tbl = (GridSqlTable)o;
+
+ if (tbl.dataTable() != null) {
+ GridCacheContextInfo curCctx = tbl.dataTable().cacheInfo();
+
+ assert curCctx != null;
+
+ boolean curMvccEnabled =
+ curCctx.config().getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+ if (mvccEnabled == null) {
+ mvccEnabled = curMvccEnabled;
+
+ if (mvccEnabled)
+ mvccCacheId = curCctx.cacheId();
+
+ cctx = curCctx;
+ }
+ else if (mvccEnabled != curMvccEnabled)
+
MvccUtils.throwAtomicityModesMismatchException(cctx.config(), curCctx.config());
+ }
+ }
+ }
+
+ return mvccCacheId;
}
/**
@@ -411,7 +478,7 @@ public class QueryParser {
* @param prepared Prepared.
* @return Statement.
*/
- public QueryParserResultDml prepareDmlStatement(Prepared prepared) {
+ private QueryParserResultDml prepareDmlStatement(Prepared prepared) {
// Prepare AST.
GridSqlQueryParser parser = new GridSqlQueryParser(false);
@@ -441,7 +508,16 @@ public class QueryParser {
MvccUtils.throwAtomicityModesMismatchException(ctx.config(),
curCtx.config());
}
- return new QueryParserResultDml(stmt, mvccEnabled);
+ // Get streamer info.
+ GridH2Table streamTbl = null;
+
+ if (GridSqlQueryParser.isStreamableInsertStatement(prepared)) {
+ GridSqlInsert insert = (GridSqlInsert)stmt;
+
+ streamTbl =
DmlAstUtils.gridTableForElement(insert.into()).dataTable();
+ }
+
+ return new QueryParserResultDml(stmt, mvccEnabled, streamTbl);
}
/**
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
index 03067e0..29299c8 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.query.h2;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.h2.command.Prepared;
+import org.jetbrains.annotations.Nullable;
/**
* Parsing result for DML statement.
@@ -30,14 +32,20 @@ public class QueryParserResultDml {
/** MVCC enabled flag. */
private final boolean mvccEnabled;
+ /** Streamer table. */
+ private final GridH2Table streamTbl;
+
/**
* Constructor.
*
* @param stmt Command.
+ * @param mvccEnabled Whether MVCC is enabled.
+ * @param streamTbl Streamer table.
*/
- public QueryParserResultDml(GridSqlStatement stmt, boolean mvccEnabled) {
+ public QueryParserResultDml(GridSqlStatement stmt, boolean mvccEnabled,
@Nullable GridH2Table streamTbl) {
this.stmt = stmt;
this.mvccEnabled = mvccEnabled;
+ this.streamTbl = streamTbl;
}
/**
@@ -53,4 +61,18 @@ public class QueryParserResultDml {
public boolean mvccEnabled() {
return mvccEnabled;
}
+
+ /**
+ * @return Streamer table.
+ */
+ @Nullable public GridH2Table streamTable() {
+ return streamTbl;
+ }
+
+ /**
+ * @return Whether statement can be used in streaming.
+ */
+ public boolean streamable() {
+ return streamTbl != null;
+ }
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
index 29cbe0e..77eec9a 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.jetbrains.annotations.Nullable;
import java.util.List;
@@ -28,47 +29,62 @@ import java.util.List;
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public class QueryParserResultSelect {
+ /** Statmement. */
+ private GridSqlStatement stmt;
+
/** Two-step query, or {@code} null if this result is for local query. */
private final GridCacheTwoStepQuery twoStepQry;
- /** Whether local split is needed. */
- private final boolean locSplit;
-
/** Metadata for two-step query, or {@code} null if this result is for
local query. */
private final List<GridQueryFieldMetadata> meta;
+ /** Involved cache IDs. */
+ private final List<Integer> cacheIds;
+
+ /** ID of the first MVCC cache. */
+ private final Integer mvccCacheId;
+
+ /** FOR UPDATE flag. */
+ private final boolean forUpdate;
+
/**
* Constructor.
*
+ * @param stmt Statement.
* @param twoStepQry Distributed query plan.
- * @param locSplit Whether local split is needed.
* @param meta Fields metadata.
+ * @param cacheIds Cache IDs.
+ * @param mvccCacheId ID of the first MVCC cache.
+ * @param forUpdate Whether this is FOR UPDATE flag.
*/
public QueryParserResultSelect(
+ GridSqlStatement stmt,
@Nullable GridCacheTwoStepQuery twoStepQry,
- boolean locSplit,
- List<GridQueryFieldMetadata> meta
+ List<GridQueryFieldMetadata> meta,
+ List<Integer> cacheIds,
+ @Nullable Integer mvccCacheId,
+ boolean forUpdate
) {
- // Local split can be true only is there is a two-step plan.
- assert twoStepQry == null && !locSplit || twoStepQry != null;
-
+ this.stmt = stmt;
this.twoStepQry = twoStepQry;
- this.locSplit = locSplit;
this.meta = meta;
+ this.cacheIds = cacheIds;
+ this.mvccCacheId = mvccCacheId;
+ this.forUpdate = forUpdate;
}
/**
- * @return Two-step query, or {@code} null if this result is for local
query.
+ * @return Parsed SELECT statement.
*/
- @Nullable public GridCacheTwoStepQuery twoStepQuery() {
- return twoStepQry;
+ public GridSqlStatement statement() {
+ return stmt;
}
/**
- * @return {@code True} if local query should be split.
+ * @return Two-step query, or {@code} null if this result is for local
query.
*/
- public boolean localSplit() {
- return locSplit;
+ @Nullable public GridCacheTwoStepQuery twoStepQuery() {
+ return twoStepQry;
}
/**
@@ -84,4 +100,32 @@ public class QueryParserResultSelect {
public boolean splitNeeded() {
return twoStepQry != null;
}
+
+ /**
+ * @return Involved cache IDs.
+ */
+ public List<Integer> cacheIds() {
+ return cacheIds;
+ }
+
+ /**
+ * @return ID of the first MVCC cache.
+ */
+ public Integer mvccCacheId() {
+ return mvccCacheId;
+ }
+
+ /**
+ * @return Whether this is a SELECT for MVCC caches.
+ */
+ public boolean mvccEnabled() {
+ return mvccCacheId != null;
+ }
+
+ /**
+ * @return Whether this is FOR UPDATE query.
+ */
+ public boolean forUpdate() {
+ return forUpdate;
+ }
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 709ce17..a2030ca 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -88,27 +88,25 @@ public final class UpdatePlanBuilder {
* Generate SELECT statements to retrieve data for modifications from and
find fast UPDATE or DELETE args,
* if available.
*
+ * @param schemaName Schema name.
* @param stmt Statement.
* @param mvccEnabled MVCC enabled flag.
- * @param loc Local query flag.
* @param idx Indexing.
- * @param conn Connection.
* @param fieldsQry Original query.
* @return Update plan.
*/
@SuppressWarnings("ConstantConditions")
public static UpdatePlan planForStatement(
+ String schemaName,
GridSqlStatement stmt,
boolean mvccEnabled,
- boolean loc,
IgniteH2Indexing idx,
- @Nullable Connection conn,
@Nullable SqlFieldsQuery fieldsQry
) throws IgniteCheckedException {
if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
- return planForInsert(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
+ return planForInsert(schemaName, stmt, idx, mvccEnabled,
fieldsQry);
else if (stmt instanceof GridSqlUpdate || stmt instanceof
GridSqlDelete)
- return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
+ return planForUpdate(schemaName, stmt, idx, mvccEnabled,
fieldsQry);
else
throw new IgniteSQLException("Unsupported operation: " +
stmt.getSQL(),
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -117,19 +115,22 @@ public final class UpdatePlanBuilder {
/**
* Prepare update plan for INSERT or MERGE.
*
+ * @param schemaName Schema name.
* @param stmt INSERT or MERGE statement.
- * @param loc Local query flag.
* @param idx Indexing.
* @param mvccEnabled Mvcc flag.
- * @param conn Connection.
* @param fieldsQuery Original query.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("ConstantConditions")
- private static UpdatePlan planForInsert(GridSqlStatement stmt, boolean
loc, IgniteH2Indexing idx,
- boolean mvccEnabled, @Nullable Connection conn, @Nullable
SqlFieldsQuery fieldsQuery)
- throws IgniteCheckedException {
+ private static UpdatePlan planForInsert(
+ String schemaName,
+ GridSqlStatement stmt,
+ IgniteH2Indexing idx,
+ boolean mvccEnabled,
+ @Nullable SqlFieldsQuery fieldsQuery
+ ) throws IgniteCheckedException {
GridSqlQuery sel = null;
GridSqlElement target;
@@ -251,9 +252,18 @@ public final class UpdatePlanBuilder {
String selectSql = sel != null ? sel.getSQL() : null;
- DmlDistributedPlanInfo distributed = (rowsNum == 0 &&
!F.isEmpty(selectSql)) ?
- checkPlanCanBeDistributed(idx, mvccEnabled, conn, fieldsQuery,
loc, selectSql,
- tbl.dataTable().cacheName()) : null;
+ DmlDistributedPlanInfo distributed = null;
+
+ if (rowsNum == 0 && !F.isEmpty(selectSql)) {
+ distributed = checkPlanCanBeDistributed(
+ idx,
+ mvccEnabled,
+ schemaName,
+ fieldsQuery,
+ selectSql,
+ tbl.dataTable().cacheName()
+ );
+ }
UpdateMode mode = stmt instanceof GridSqlMerge ? UpdateMode.MERGE :
UpdateMode.INSERT;
@@ -323,21 +333,19 @@ public final class UpdatePlanBuilder {
/**
* Prepare update plan for UPDATE or DELETE.
*
+ * @param schemaName Schema name.
* @param stmt UPDATE or DELETE statement.
- * @param loc Local query flag.
* @param idx Indexing.
- * @param mvccEnabled Mvcc flag.
- * @param conn Connection.
+ * @param mvccEnabled MVCC flag.
* @param fieldsQuery Original query.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
private static UpdatePlan planForUpdate(
+ String schemaName,
GridSqlStatement stmt,
- boolean loc,
IgniteH2Indexing idx,
boolean mvccEnabled,
- @Nullable Connection conn,
@Nullable SqlFieldsQuery fieldsQuery
) throws IgniteCheckedException {
GridSqlElement target;
@@ -428,9 +436,18 @@ public final class UpdatePlanBuilder {
String selectSql = sel.getSQL();
- DmlDistributedPlanInfo distributed = F.isEmpty(selectSql) ?
null :
- checkPlanCanBeDistributed(idx, mvccEnabled, conn,
fieldsQuery, loc, selectSql,
- tbl.dataTable().cacheName());
+ DmlDistributedPlanInfo distributed = null;
+
+ if (!F.isEmpty(selectSql)) {
+ distributed = checkPlanCanBeDistributed(
+ idx,
+ mvccEnabled,
+ schemaName,
+ fieldsQuery,
+ selectSql,
+ tbl.dataTable().cacheName()
+ );
+ }
return new UpdatePlan(
UpdateMode.UPDATE,
@@ -454,9 +471,18 @@ public final class UpdatePlanBuilder {
String selectSql = sel.getSQL();
- DmlDistributedPlanInfo distributed = F.isEmpty(selectSql) ?
null :
- checkPlanCanBeDistributed(idx, mvccEnabled, conn,
fieldsQuery, loc, selectSql,
- tbl.dataTable().cacheName());
+ DmlDistributedPlanInfo distributed = null;
+
+ if (!F.isEmpty(selectSql)) {
+ distributed = checkPlanCanBeDistributed(
+ idx,
+ mvccEnabled,
+ schemaName,
+ fieldsQuery,
+ selectSql,
+ tbl.dataTable().cacheName()
+ );
+ }
return new UpdatePlan(
UpdateMode.DELETE,
@@ -836,23 +862,26 @@ public final class UpdatePlanBuilder {
*
* @param idx Indexing.
* @param mvccEnabled Mvcc flag.
- * @param conn Connection.
+ * @param schemaName Schema name.
* @param fieldsQry Initial update query.
- * @param loc Local query flag.
* @param selectQry Derived select query.
* @param cacheName Cache name.
* @return distributed update plan info, or {@code null} if cannot be
distributed.
* @throws IgniteCheckedException if failed.
*/
- private static DmlDistributedPlanInfo
checkPlanCanBeDistributed(IgniteH2Indexing idx, boolean mvccEnabled,
- Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String
selectQry, String cacheName)
+ private static DmlDistributedPlanInfo checkPlanCanBeDistributed(
+ IgniteH2Indexing idx,
+ boolean mvccEnabled,
+ String schemaName,
+ SqlFieldsQuery fieldsQry,
+ String selectQry,
+ String cacheName
+ )
throws IgniteCheckedException {
- if (loc || (!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) ||
DmlUtils.isBatched(fieldsQry))
+ if ((!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) ||
DmlUtils.isBatched(fieldsQry))
return null;
- assert conn != null;
-
- try {
+ try (Connection conn =
idx.connections().connectionNoCache(schemaName)) {
// Get a new prepared statement for derived select query.
try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
H2Utils.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
@@ -869,13 +898,12 @@ public final class UpdatePlanBuilder {
);
boolean distributed =
- !qry.isLocalSplit() &&
// No split for local qry
+ !qry.isLocalSplit() &&
// No split for local
qry.hasCacheIds() &&
// Over real caches
qry.skipMergeTable() &&
// No merge table
qry.mapQueries().size() == 1 &&
!qry.mapQueries().get(0).hasSubQueries(); // One w/o subqueries
if (distributed) {
- // TODO: This should be done during plan build.
List<Integer> cacheIds = H2Utils.collectCacheIds(idx,
CU.cacheId(cacheName), qry.tables());
H2Utils.checkQuery(idx, cacheIds, qry.mvccEnabled(),
qry.forUpdate(), qry.tables());
@@ -897,7 +925,7 @@ public final class UpdatePlanBuilder {
* @param qry Query.
* @return {@code true} if update can be distributed.
*/
- public static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
+ private static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
return qry != null && !qry.isLocal() &&
qry instanceof SqlFieldsQueryEx &&
((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 4d298ff..7417ba2 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -582,11 +582,7 @@ public class GridSqlQueryParser {
/** */
private static Command extractCommand(PreparedStatement stmt) {
- try {
- return COMMAND.get(stmt.unwrap(JdbcPreparedStatement.class));
- } catch (SQLException e) {
- throw new IgniteSQLException(e);
- }
+ return COMMAND.get((JdbcPreparedStatement)stmt);
}
/**
@@ -1784,7 +1780,7 @@ public class GridSqlQueryParser {
/**
* @return All known cache IDs.
*/
- public Collection<Integer> cacheIds() {
+ public List<Integer> cacheIds() {
ArrayList<Integer> res = new ArrayList<>(1);
for (Object o : h2ObjToGridObj.values()) {
@@ -2303,6 +2299,16 @@ public class GridSqlQueryParser {
public static boolean isStreamableInsertStatement(PreparedStatement
nativeStmt) {
Prepared prep = prepared(nativeStmt);
+ return isStreamableInsertStatement(prep);
+ }
+
+ /**
+ * Check if passed statement is insert statement eligible for streaming.
+ *
+ * @param prep Prepared statement.
+ * @return {@code True} if streamable insert.
+ */
+ public static boolean isStreamableInsertStatement(Prepared prep) {
return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) ==
null;
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index 15c5abe..e7f3b8c 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
@@ -554,43 +553,6 @@ public class RunningQueriesTest extends
AbstractIndexingCommonTest {
}
/**
- * Check tracking running queries for stream batching.
- *
- * @throws Exception in case of failure.
- */
- @Test
- public void testJdbcStreamBatchUpdate() throws Exception {
- try (Connection conn = GridTestUtils.connect(ignite, null); Statement
stmt = conn.createStatement()) {
- conn.setSchema("\"default\"");
-
- newBarrier(1);
-
- final int BATCH_SIZE = 10;
-
- stmt.executeUpdate("SET STREAMING ON BATCH_SIZE " + BATCH_SIZE);
-
- newBarrier(2);
-
- for (int i = 0; i < BATCH_SIZE; i++)
- stmt.addBatch("insert into Integer (_key, _val) values (" + i
+ "," + i + ")");
-
- for (int i = 0; i < BATCH_SIZE; i++) {
- assertWaitingOnBarrier();
-
- awaitTimeouted();
-
- assertWaitingOnBarrier();
-
- Collection<GridRunningQueryInfo> runningQueries =
ignite.context().query().runningQueries(-1);
-
- assertEquals(1, runningQueries.size());
-
- awaitTimeouted();
- }
- }
- }
-
- /**
* Check tracking running queries for stream COPY command.
*
* @throws SQLException If failed.
@@ -682,18 +644,6 @@ public class RunningQueriesTest extends
AbstractIndexingCommonTest {
*/
private static class BlockingIndexing extends IgniteH2Indexing {
/** {@inheritDoc} */
- @Override public void checkStatementStreamable(PreparedStatement
nativeStmt) {
- super.checkStatementStreamable(nativeStmt);
-
- try {
- barrier.await();
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>>
querySqlFields(String schemaName, SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx, boolean keepBinary, boolean
failOnMultipleStmts,
MvccQueryTracker tracker, GridQueryCancel cancel, boolean
registerAsNewQry) {
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
deleted file mode 100644
index de1fb09..0000000
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class H2StatementCacheSelfTest extends AbstractIndexingCommonTest {
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testEviction() throws Exception {
- H2StatementCache stmtCache = new H2StatementCache(1);
- H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
- PreparedStatement stmt1 = stmt();
- stmtCache.put(key1, stmt1);
-
- assertSame(stmt1, stmtCache.get(key1));
-
- stmtCache.put(new H2CachedStatementKey("mydb", "2"), stmt());
-
- assertNull(stmtCache.get(key1));
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testLruEvictionInStoreOrder() throws Exception {
- H2StatementCache stmtCache = new H2StatementCache(2);
-
- H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
- H2CachedStatementKey key2 = new H2CachedStatementKey("", "2");
- stmtCache.put(key1, stmt());
- stmtCache.put(key2, stmt());
-
- stmtCache.put(new H2CachedStatementKey("", "3"), stmt());
-
- assertNull(stmtCache.get(key1));
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testLruEvictionInAccessOrder() throws Exception {
- H2StatementCache stmtCache = new H2StatementCache(2);
-
- H2CachedStatementKey key1 = new H2CachedStatementKey("", "1");
- H2CachedStatementKey key2 = new H2CachedStatementKey("", "2");
- stmtCache.put(key1, stmt());
- stmtCache.put(key2, stmt());
- stmtCache.get(key1);
-
- stmtCache.put(new H2CachedStatementKey("", "3"), stmt());
-
- assertNull(stmtCache.get(key2));
- }
-
- /**
- *
- */
- private static PreparedStatement stmt() {
- return new PreparedStatementExImpl(null);
- }
-}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
deleted file mode 100644
index f0c7986..0000000
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.PreparedStatement;
-import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class PreparedStatementExSelfTest extends AbstractIndexingCommonTest {
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testStoringMeta() throws Exception {
- PreparedStatement stmt = stmt();
-
- PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class);
-
- wrapped.putMeta(0, "0");
-
- assertEquals("0", wrapped.meta(0));
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testStoringMoreMetaKeepsExisting() throws Exception {
- PreparedStatement stmt = stmt();
-
- PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class);
-
- wrapped.putMeta(0, "0");
- wrapped.putMeta(1, "1");
-
- assertEquals("0", wrapped.meta(0));
- assertEquals("1", wrapped.meta(1));
- }
-
- /**
- *
- */
- private static PreparedStatement stmt() {
- return new PreparedStatementExImpl(null);
- }
-}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 095518c..0093108 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -205,10 +205,8 @@ import
org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
import org.apache.ignite.internal.processors.query.SqlSystemViewsSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildSelfTest;
import
org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
-import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
import
org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
-import
org.apache.ignite.internal.processors.query.h2.PreparedStatementExSelfTest;
import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
import
org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest;
import
org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
@@ -513,8 +511,6 @@ import org.junit.runners.Suite;
EncryptedSqlTableTest.class,
ThreadLocalObjectPoolSelfTest.class,
- H2StatementCacheSelfTest.class,
- PreparedStatementExSelfTest.class,
// Partition loss.
IndexingCachePartitionLossPolicySelfTest.class,