This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-18540 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 9666cbe16841fa78b1e5d59f8c0d1578cd0060b6 Author: tledkov <[email protected]> AuthorDate: Mon May 6 14:10:02 2019 +0300 GG-17385 [IGNITE-11499] SQL: DML internal batch size is 1 by default to prevent deadlock --- .../jdbc/thin/JdbcThinConnectionSelfTest.java | 33 +++- .../apache/ignite/cache/query/SqlFieldsQuery.java | 35 ++++ .../internal/jdbc/thin/ConnectionProperties.java | 15 ++ .../jdbc/thin/ConnectionPropertiesImpl.java | 29 +++- .../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 8 +- .../odbc/jdbc/JdbcConnectionContext.java | 11 +- .../processors/odbc/jdbc/JdbcRequestHandler.java | 9 +- .../internal/processors/odbc/jdbc/JdbcUtils.java | 24 +++ .../processors/odbc/odbc/OdbcRequestHandler.java | 1 + .../processors/query/SqlClientContext.java | 17 +- .../processors/query/h2/H2TableDescriptor.java | 4 +- .../processors/query/h2/IgniteH2Indexing.java | 6 +- .../processors/query/h2/QueryParameters.java | 27 +++- .../processors/query/DmlBatchSizeDeadlockTest.java | 178 +++++++++++++++++++++ .../query/IgniteSqlNotNullConstraintTest.java | 40 ++--- .../IgniteBinaryCacheQueryTestSuite2.java | 2 + 16 files changed, 389 insertions(+), 50 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 1dc35f6..56e0aa5 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -247,15 +247,42 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { } /** + * Test update batch size property. + * + * @throws Exception If failed. + */ + @Test + public void testUpdateBatchSize() throws Exception { + assertInvalid(urlWithAffinityAwarenessFlagSemicolon + ";updateBatchSize=-1", + "Property cannot be lower than 1 [name=updateBatchSize, value=-1]"); + + try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon)) { + for (JdbcThinTcpIo io: ios(conn)) + assertNull(io.connectionProperties().getUpdateBatchSize()); + } + + try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon + + ";updateBatchSize=1024")) { + for (JdbcThinTcpIo io: ios(conn)) + assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize()); + } + + try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + + "&updateBatchSize=1024")) { + for (JdbcThinTcpIo io: ios(conn)) + assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize()); + } + } + + /** * Test SQL hints. * * @throws Exception If failed. */ @Test public void testSqlHints() throws Exception { - try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) { - assertHints(conn, false, false, false, false, false, - false, affinityAwareness); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + assertHints(conn, false, false, false, false, false, false, affinityAwareness); } try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) { diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 7ee7618..8be968e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -49,6 +49,9 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private static final long serialVersionUID = 0L; + /** Default value of the update internal batch size. */ + private static final int DFLT_UPDATE_BATCH_SIZE = 1; + /** Do not remove. For tests only. */ @SuppressWarnings("NonConstantFieldWithUpperCaseName") private static boolean DFLT_LAZY; @@ -88,6 +91,12 @@ public class SqlFieldsQuery extends Query<List<?>> { private Boolean dataPageScanEnabled; /** + * Update internal batch size. Default is 1 to prevent deadlock on update where keys sequence are different in + * several concurrent updates. + */ + private int updateBatchSize = DFLT_UPDATE_BATCH_SIZE; + + /** * Copy constructs SQL fields query. * * @param qry SQL query. @@ -104,6 +113,7 @@ public class SqlFieldsQuery extends Query<List<?>> { parts = qry.parts; schema = qry.schema; dataPageScanEnabled = qry.dataPageScanEnabled; + updateBatchSize = qry.updateBatchSize; } /** @@ -408,6 +418,31 @@ public class SqlFieldsQuery extends Query<List<?>> { } /** + * Gets update internal bach size. + * Default is 1 to prevent deadlock on update where keys sequence are different in + * several concurrent updates. + * + * @return Update internal batch size + */ + public int getUpdateBatchSize() { + return updateBatchSize; + } + + /** + * Sets update internal bach size. + * Default is 1 to prevent deadlock on update where keys sequence are different in + * several concurrent updates. + * + * @param updateBatchSize Update internal batch size. + * @return {@code this} for chaining. + */ + public SqlFieldsQuery setUpdateBatchSize(int updateBatchSize) { + this.updateBatchSize = updateBatchSize; + + return this; + } + + /** * @return Copy of this query. */ public SqlFieldsQuery copy() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java index 53df56e..2cab155 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java @@ -416,4 +416,19 @@ public interface ConnectionProperties { * for this connection, if {@code false} then it's disabled. */ public void setAffinityAwareness(boolean affinityAwareness); + + /** + * Note: Batch size of 1 prevents deadlock on update where keys sequence are different in several concurrent updates. + * + * @return update internal bach size. + */ + @Nullable public Integer getUpdateBatchSize(); + + /** + * Note: Set to 1 to prevent deadlock on update where keys sequence are different in several concurrent updates. + * + * @param updateBatchSize update internal bach size. + * @throws SQLException On error. + */ + public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java index e9d5b0c..02ca1e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java @@ -194,6 +194,12 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa "Whether jdbc thin affinity awareness is enabled.", false, false); + /** Update batch size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). */ + private IntegerProperty updateBatchSize = new IntegerProperty("updateBatchSize", + "Update bach size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). " + + "Set to 1 to prevent deadlock on update where keys sequence are different " + + "in several concurrent updates.", null, false, 1, Integer.MAX_VALUE); + /** Properties array. */ private final ConnectionProperty [] propsArray = { distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor, @@ -204,7 +210,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa sslTrustAll, sslFactory, user, passwd, dataPageScanEnabled, - affinityAwareness + affinityAwareness, + updateBatchSize }; /** {@inheritDoc} */ @@ -520,6 +527,16 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa this.affinityAwareness.setValue(affinityAwareness); } + /** {@inheritDoc} */ + @Override public @Nullable Integer getUpdateBatchSize() { + return updateBatchSize.value(); + } + + /** {@inheritDoc} */ + @Override public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException { + this.updateBatchSize.setValue(updateBatchSize); + } + /** * @param url URL connection. * @param props Environment properties. @@ -1020,8 +1037,6 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa NumberProperty(String name, String desc, Number dfltVal, boolean required, Number min, Number max) { super(name, desc, dfltVal, null, required); - assert dfltVal != null; - val = dfltVal; range = new Number[] {min, max}; @@ -1030,7 +1045,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** {@inheritDoc} */ @Override void init(String str) throws SQLException { if (str == null) - val = (int)dfltVal; + val = dfltVal != null ? (int)dfltVal : null; else { try { setValue(parse(str)); @@ -1051,7 +1066,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** {@inheritDoc} */ @Override String valueObject() { - return String.valueOf(val); + return val != null ? String.valueOf(val) : null; } /** @@ -1102,8 +1117,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** * @return Property value. */ - int value() { - return val.intValue(); + Integer value() { + return val != null ? val.intValue() : null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index d741320..7c5759e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -23,8 +23,8 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.sql.SQLException; import java.util.List; -import java.util.UUID; import java.util.Random; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.QueryCancelledException; @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUtils; import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -246,9 +247,12 @@ public class JdbcThinTcpIo { if (ver.compareTo(VER_2_7_0) >= 0) writer.writeString(connProps.nestedTxMode()); - if (ver.compareTo(VER_2_8_0) >= 0) + if (ver.compareTo(VER_2_8_0) >= 0) { writer.writeByte(nullableBooleanToByte(connProps.isDataPageScanEnabled())); + JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize()); + } + if (!F.isEmpty(connProps.getUsername())) { assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index f713b53..34cdabe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -106,7 +106,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte * @param ctx Kernal Context. * @param ses Session. * @param busyLock Shutdown busy lock. - * @param connId + * @param connId Connection ID. * @param maxCursors Maximum allowed cursors. */ public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId, @@ -166,12 +166,15 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte } } - Boolean dataPageScanEnabled = null; + Integer updateBatchSize = null; - if (ver.compareTo(VER_2_8_0) >= 0) + if (ver.compareTo(VER_2_8_0) >= 0) { dataPageScanEnabled = nullableBooleanFromByte(reader.readByte()); + updateBatchSize = JdbcUtils.readNullableInteger(reader); + } + if (ver.compareTo(VER_2_5_0) >= 0) { String user = null; String passwd = null; @@ -206,7 +209,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode, - dataPageScanEnabled, actx, ver, this); + dataPageScanEnabled, updateBatchSize, actx, ver, this); handler.start(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 362b0c8..42edcc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -171,6 +171,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @param autoCloseCursors Flag to automatically close server cursors. * @param lazy Lazy query execution flag. * @param skipReducerOnUpdate Skip reducer on update flag. + * @param dataPageScanEnabled Enable scan data page mode. + * @param updateBatchSize Size of internal batch for DML queries. * @param actx Authentication context. * @param protocolVer Protocol version. * @param connCtx Jdbc connection context. @@ -188,6 +190,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { boolean skipReducerOnUpdate, NestedTxMode nestedTxMode, @Nullable Boolean dataPageScanEnabled, + @Nullable Integer updateBatchSize, AuthorizationContext actx, ClientListenerProtocolVersion protocolVer, JdbcConnectionContext connCtx @@ -212,7 +215,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { replicatedOnly, lazy, skipReducerOnUpdate, - dataPageScanEnabled + dataPageScanEnabled, + updateBatchSize ); this.busyLock = busyLock; @@ -968,6 +972,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { if (cliCtx.dataPageScanEnabled() != null) qry.setDataPageScanEnabled(cliCtx.dataPageScanEnabled()); + + if (cliCtx.updateBatchSize() != null) + qry.setUpdateBatchSize(cliCtx.updateBatchSize()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java index f07a295..1befe4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; +import org.jetbrains.annotations.Nullable; /** * Various JDBC utility methods. @@ -104,4 +105,27 @@ public class JdbcUtils { else return Collections.emptyList(); } + + /** + * Read nullable Integer. + * + * @param reader Binary reader. + * @return read value. + */ + @Nullable public static Integer readNullableInteger(BinaryReaderExImpl reader) { + return reader.readBoolean() ? reader.readInt() : null; + } + + /** + * Write nullable integer. + * + * @param writer Binary writer. + * @param val Integer value.. + */ + public static void writeNullableInteger(BinaryWriterExImpl writer, @Nullable Integer val) { + writer.writeBoolean(val != null); + + if (val != null) + writer.writeInt(val); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java index 7d3f9bb..2dd7338 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -166,6 +166,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { replicatedOnly, lazy, skipReducerOnUpdate, + null, null ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java index 46e918e..a365e13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java @@ -60,6 +60,9 @@ public class SqlClientContext implements AutoCloseable { /** Data page scan support for query execution. */ private final @Nullable Boolean dataPageScanEnabled; + /** Update internal batch size. */ + private final @Nullable Integer updateBatchSize; + /** Monitor for stream operations. */ private final Object muxStreamer = new Object(); @@ -103,11 +106,15 @@ public class SqlClientContext implements AutoCloseable { * @param replicatedOnly Replicated caches only flag. * @param lazy Lazy query execution flag. * @param skipReducerOnUpdate Skip reducer on update flag. + * @param dataPageScanEnabled Enable scan data page mode. + * @param updateBatchSize Size of internal batch for DML queries. */ public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> orderedBatchWorkerFactory, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate, - @Nullable Boolean dataPageScanEnabled) { + @Nullable Boolean dataPageScanEnabled, + @Nullable Integer updateBatchSize + ) { this.ctx = ctx; this.orderedBatchWorkerFactory = orderedBatchWorkerFactory; this.distributedJoins = distributedJoins; @@ -117,6 +124,7 @@ public class SqlClientContext implements AutoCloseable { this.lazy = lazy; this.skipReducerOnUpdate = skipReducerOnUpdate; this.dataPageScanEnabled = dataPageScanEnabled; + this.updateBatchSize = updateBatchSize; log = ctx.log(SqlClientContext.class.getName()); } @@ -227,6 +235,13 @@ public class SqlClientContext implements AutoCloseable { } /** + * @return Update internal batch size. + */ + public @Nullable Integer updateBatchSize() { + return updateBatchSize; + } + + /** * @return Streaming state flag (on or off). */ public boolean isStream() { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java index 6d43f48..003776b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java @@ -50,10 +50,10 @@ public class H2TableDescriptor { /** PK index name. */ public static final String PK_IDX_NAME = "_key_PK"; - /** PK hashindex name */ + /** PK hash index name. */ public static final String PK_HASH_IDX_NAME = "_key_PK_hash"; - /** Affinity key index name */ + /** Affinity key index name. */ public static final String AFFINITY_KEY_IDX_NAME = "AFFINITY_KEY"; /** Indexing. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 598072f..154a624 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1268,6 +1268,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param cancel Cancel. * @param timeout Timeout. * @return Fields query. + * @throws IgniteCheckedException On error. */ private QueryCursorImpl<List<?>> executeSelectForDml( String schema, @@ -1314,6 +1315,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param inTx Flag whether query is executed within transaction. * @param timeout Timeout. * @return Query result. + * @throws IgniteCheckedException On error. */ private Iterable<List<?>> executeSelect0( QueryDescriptor qryDesc, @@ -2349,7 +2351,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { List<List<List<?>>> cur = plan.createRows(argss); //TODO: IGNITE-11176 - Need to support cancellation - ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize()); + ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize()); } finally { DmlUtils.restoreKeepBinaryContext(cctx, opCtx); @@ -2623,7 +2625,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { }, cancel); } - int pageSize = loc ? 0 : qryParams.pageSize(); + int pageSize = qryParams.updateBatchSize(); //TODO: IGNITE-11176 - Need to support cancellation return DmlUtils.processSelectResult(plan, cur, pageSize); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java index 15e6abe..2b33f4a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java @@ -55,6 +55,12 @@ public class QueryParameters { private final List<Object[]> batchedArgs; /** + * Update internal batch size. + * Default is 1 to prevent deadlock on update where keys sequence are different in several concurrent updates. + */ + private final int updateBatchSize; + + /** * Create parameters from query. * * @param qry Query. @@ -85,7 +91,8 @@ public class QueryParameters { qry.isDataPageScanEnabled(), nestedTxMode, autoCommit, - batchedArgs + batchedArgs, + qry.getUpdateBatchSize() ); } @@ -101,6 +108,7 @@ public class QueryParameters { * @param nestedTxMode Nested TX mode. * @param autoCommit Auto-commit flag. * @param batchedArgs Batched arguments. + * @param updateBatchSize Update internal batch size. */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") private QueryParameters( @@ -112,7 +120,8 @@ public class QueryParameters { Boolean dataPageScanEnabled, NestedTxMode nestedTxMode, boolean autoCommit, - List<Object[]> batchedArgs + List<Object[]> batchedArgs, + int updateBatchSize ) { this.args = args; this.parts = parts; @@ -123,6 +132,7 @@ public class QueryParameters { this.nestedTxMode = nestedTxMode; this.autoCommit = autoCommit; this.batchedArgs = batchedArgs; + this.updateBatchSize = updateBatchSize; } /** @@ -192,6 +202,16 @@ public class QueryParameters { } /** + * Gets update internal bach size. + * Default is 1 to prevent deadlock on update where keys sequance are different in several concurrent updates. + * + * @return Update internal batch size + */ + public int updateBatchSize() { + return updateBatchSize; + } + + /** * Convert current batched arguments to a form with single arguments. * * @param args Arguments. @@ -207,7 +227,8 @@ public class QueryParameters { this.dataPageScanEnabled, this.nestedTxMode, this.autoCommit, - null + null, + this.updateBatchSize ); } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java new file mode 100644 index 0000000..790ab72 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java @@ -0,0 +1,178 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +/** + * Tests DML deadlock with different update batch size. + */ +public class DmlBatchSizeDeadlockTest extends AbstractIndexingCommonTest { + /** Keys count. */ + private static final int KEY_CNT = 1000; + + /** Test time to run. */ + private static final int TEST_TIME = 20_000; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws IgniteCheckedException On error. + */ + @Test + public void testDeadlockOnDmlAtomic() throws IgniteCheckedException { + checkDeadlockOnDml(CacheAtomicityMode.ATOMIC); + } + + /** + * @throws IgniteCheckedException On error. + */ + @Test + public void testDeadlockOnDmlTransactional() throws IgniteCheckedException { + checkDeadlockOnDml(CacheAtomicityMode.TRANSACTIONAL); + } + + /** + * @param mode Atomicity mode. + * @throws IgniteCheckedException On failed. + */ + public void checkDeadlockOnDml(CacheAtomicityMode mode) throws IgniteCheckedException { + IgniteCache<Long, Long> cache = createCache(mode); + + final long tEnd = U.currentTimeMillis() + TEST_TIME; + + final IgniteInternalFuture futAsc = GridTestUtils.runAsync(() -> { + while (U.currentTimeMillis() < tEnd) { + try { + sql("UPDATE test SET val = 2 ORDER BY id ASC"); + } + catch (Exception e) { + IgniteSQLException esql = X.cause(e, IgniteSQLException.class); + + if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " + + "had been modified concurrently")) + throw e; + } + } + }); + + final IgniteInternalFuture futDesc = GridTestUtils.runAsync(() -> { + while (U.currentTimeMillis() < tEnd) { + while (U.currentTimeMillis() < tEnd) { + try { + sql("UPDATE test SET val = 3 ORDER BY id DESC"); + } + catch (Exception e) { + IgniteSQLException esql = X.cause(e, IgniteSQLException.class); + + if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " + + "had been modified concurrently")) + throw e; + } + } + } + }); + + final IgniteInternalFuture futCache = GridTestUtils.runAsync(() -> { + while (U.currentTimeMillis() < tEnd) { + Map<Long, Long> map = new LinkedHashMap(); + + for (long i = KEY_CNT - 1; i >= 0; --i) + map.put(i, i); + + cache.putAll(map); + } + }); + + boolean deadlock = !GridTestUtils.waitForCondition( + () -> futAsc.isDone() && futDesc.isDone() && futCache.isDone(), + TEST_TIME + 5000); + + if (deadlock) { + futAsc.cancel(); + futDesc.cancel(); + futCache.cancel(); + + fail("Deadlock on DML"); + } + } + + /** + * @param mode Cache atomicity mode. + * @return Created test cache. + */ + private IgniteCache<Long, Long> createCache(CacheAtomicityMode mode) { + IgniteCache<Long, Long> c = grid().createCache(new CacheConfiguration<Long, Long>() + .setName("test") + .setSqlSchema("TEST") + .setAtomicityMode(mode) + .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) + .setTableName("test") + .addQueryField("id", Long.class.getName(), null) + .addQueryField("val", Long.class.getName(), null) + .setKeyFieldName("id") + .setValueFieldName("val") + )) + .setAffinity(new RendezvousAffinityFunction(false, 10))); + + for (long i = 0; i < KEY_CNT; ++i) + c.put(i, i); + + return c; + } + + /** + * @param sql SQL query. + * @param args Query parameters. + * @return Results cursor. + */ + private FieldsQueryCursor<List<?>> sql(String sql, Object... args) { + return grid().context().query().querySqlFields(new SqlFieldsQuery(sql) + .setSchema("TEST") + .setUpdateBatchSize(1) + .setArgs(args), false); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java index 8bd00f7..c19bd8b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java @@ -779,27 +779,10 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest { /** */ private void checkNotNullCheckDmlInsertValues(CacheAtomicityMode atomicityMode) throws Exception { - executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL) WITH \"atomicity=" + executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL, age INT) WITH \"atomicity=" + atomicityMode.name() + "\""); - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - executeSql("INSERT INTO test(id, name) " + - "VALUES (1, 'ok'), (2, NULLIF('a', 'a')), (3, 'ok')"); - - return null; - } - }, IgniteSQLException.class, ERR_MSG); - - List<List<?>> result = executeSql("SELECT id, name FROM test ORDER BY id"); - - assertEquals(0, result.size()); - - executeSql("INSERT INTO test(id, name) VALUES (1, 'ok'), (2, 'ok2'), (3, 'ok3')"); - - result = executeSql("SELECT id, name FROM test ORDER BY id"); - - assertEquals(3, result.size()); + checkNotNullInsertValues(); } /** */ @@ -821,10 +804,16 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest { executeSql("ALTER TABLE test ADD COLUMN name VARCHAR NOT NULL"); + checkNotNullInsertValues(); + } + + /** + * @throws Exception If failed. + */ + private void checkNotNullInsertValues() throws Exception { GridTestUtils.assertThrows(log(), new Callable<Object>() { @Override public Object call() throws Exception { - executeSql("INSERT INTO test(id, name, age) " + - "VALUES (1, 'ok', 1), (2, NULLIF('a', 'a'), 2), (3, 'ok', 3)"); + executeSql("INSERT INTO test(id, name, age) VALUES (2, NULLIF('a', 'a'), 2)"); return null; } @@ -846,7 +835,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest { public void testNotNullCheckDmlInsertFromSelect() throws Exception { executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR, age INT)"); - executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (2, null, 25), (3, 'John', 30)"); + executeSql("INSERT INTO test(id, name, age) VALUES (2, null, 25)"); GridTestUtils.assertThrows(log(), new Callable<Object>() { @Override public Object call() throws Exception { @@ -860,6 +849,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest { assertEquals(0, result.size()); + executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (3, 'John', 30)"); executeSql("DELETE FROM test WHERE id = 2"); result = executeSql("INSERT INTO " + TABLE_PERSON + "(_key, name, age) " + "SELECT id, name, age FROM test"); @@ -905,9 +895,9 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest { GridTestUtils.assertThrows(log(), new Callable<Object>() { @Override public Object call() throws Exception { - return executeSql("UPDATE dest" + - " p SET (name) = " + - "(SELECT name FROM src t WHERE p.id = t.id)"); + return executeSql("UPDATE dest p " + + "SET (name) = (SELECT name FROM src t WHERE p.id = t.id) " + + "WHERE p.id = 2"); } }, IgniteSQLException.class, ERR_MSG); diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java index 416cc9e..dcb6dfb 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.index.DynamicIndexReplicatedT import org.apache.ignite.internal.processors.cache.query.ScanQueryOffheapExpiryPolicySelfTest; import org.apache.ignite.internal.processors.database.baseline.IgniteChangingBaselineCacheQueryNodeRestartSelfTest; import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineCacheQueryNodeRestartsSelfTest; +import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQueryTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest; @@ -141,6 +142,7 @@ import org.junit.runners.Suite; LocalQueryLazyTest.class, LongRunningQueryTest.class, + DmlBatchSizeDeadlockTest.class }) public class IgniteBinaryCacheQueryTestSuite2 { }
