IGNITE-9171: SQL: always execute queries in lazy mode. This closes #4514. This closes #4538. This closes #4870.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de21856b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de21856b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de21856b Branch: refs/heads/ignite-2.7 Commit: de21856bcc904e3d67d4c15be75b9eb503cb2df5 Parents: 0c879bb Author: tledkov-gridgain <[email protected]> Authored: Thu Oct 11 14:34:20 2018 +0300 Committer: devozerov <[email protected]> Committed: Thu Oct 11 14:37:08 2018 +0300 ---------------------------------------------------------------------- .../internal/jdbc2/JdbcConnectionSelfTest.java | 12 +- .../jdbc/thin/JdbcThinConnectionSelfTest.java | 38 +- .../jdbc/thin/JdbcThinDataSourceSelfTest.java | 12 +- .../apache/ignite/IgniteSystemProperties.java | 7 +- .../ignite/cache/query/SqlFieldsQuery.java | 20 +- .../jdbc/thin/ConnectionPropertiesImpl.java | 2 +- .../ignite/internal/jdbc2/JdbcConnection.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 2 +- .../query/h2/H2ConnectionWrapper.java | 11 + .../internal/processors/query/h2/H2Utils.java | 15 + .../processors/query/h2/IgniteH2Indexing.java | 180 ++++--- .../processors/query/h2/ObjectPool.java | 97 ++++ .../processors/query/h2/ObjectPoolReusable.java | 58 +++ .../query/h2/ThreadLocalObjectPool.java | 103 ---- .../processors/query/h2/dml/UpdatePlan.java | 8 +- .../query/h2/opt/GridH2QueryContext.java | 33 +- .../processors/query/h2/opt/GridH2Table.java | 133 ++++- .../query/h2/twostep/GridMapQueryExecutor.java | 498 ++++++++++--------- .../query/h2/twostep/GridResultPage.java | 7 +- .../query/h2/twostep/MapNodeResults.java | 13 +- .../query/h2/twostep/MapQueryLazyWorker.java | 223 +++++++-- .../query/h2/twostep/MapQueryResult.java | 34 +- .../query/h2/twostep/MapQueryResults.java | 40 +- ...GridCacheLazyQueryPartitionsReleaseTest.java | 2 - .../IgniteCacheQueryH2IndexingLeakTest.java | 9 +- ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 6 +- ...QueryNodeRestartDistributedJoinSelfTest.java | 14 +- ...ynamicColumnsAbstractConcurrentSelfTest.java | 6 +- .../cache/index/H2ConnectionLeaksSelfTest.java | 2 +- .../processors/query/LazyQuerySelfTest.java | 202 +++++++- .../processors/query/h2/ObjectPoolSelfTest.java | 125 +++++ .../query/h2/ThreadLocalObjectPoolSelfTest.java | 113 ----- .../h2/twostep/RetryCauseMessageSelfTest.java | 16 - .../IgniteCacheQuerySelfTestSuite.java | 4 +- .../ignite/cache/query/query_sql_fields.h | 4 +- .../cpp/odbc-test/src/configuration_test.cpp | 4 +- .../cpp/odbc/src/config/configuration.cpp | 2 +- .../Query/Linq/CacheLinqTest.Introspection.cs | 2 + .../Client/Cache/SqlQueryTest.cs | 4 +- .../Cache/Query/SqlFieldsQuery.cs | 43 +- ...benchmark-native-sql-cache-select.properties | 96 ++++ .../benchmark-native-sql-select.properties | 17 +- .../ignite-localhost-sql-query-config.xml | 91 ++++ .../yardstick/IgniteAbstractBenchmark.java | 30 +- .../yardstick/IgniteBenchmarkArguments.java | 13 + .../yardstick/jdbc/AbstractNativeBenchmark.java | 3 + .../apache/ignite/yardstick/jdbc/JdbcUtils.java | 47 +- .../jdbc/NativeSqlCacheQueryRangeBenchmark.java | 145 ++++++ .../jdbc/NativeSqlQueryRangeBenchmark.java | 13 +- 49 files changed, 1751 insertions(+), 810 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java index d560d74..db0a959 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java @@ -308,7 +308,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertTrue(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertFalse(((JdbcConnection)conn).isLazy()); + assertTrue(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } @@ -317,7 +317,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertTrue(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertFalse(((JdbcConnection)conn).isLazy()); + assertTrue(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } @@ -326,15 +326,15 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertTrue(((JdbcConnection)conn).isCollocatedQuery()); - assertFalse(((JdbcConnection)conn).isLazy()); + assertTrue(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } - try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=true@" + configURL())) { + try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=false@" + configURL())) { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertTrue(((JdbcConnection)conn).isLazy()); + assertFalse(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "skipReducerOnUpdate=true@" @@ -342,7 +342,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertFalse(((JdbcConnection)conn).isLazy()); + assertTrue(((JdbcConnection)conn).isLazy()); assertTrue(((JdbcConnection)conn).skipReducerOnUpdate()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 80397e6..26c34cf 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -230,36 +230,36 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { */ public void testSqlHints() throws Exception { try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { - assertHints(conn, false, false, false, false, false, false); + assertHints(conn, false, false, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) { - assertHints(conn, true, false, false, false, false, false); + assertHints(conn, true, false, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) { - assertHints(conn, false, true, false, false, false, false); + assertHints(conn, false, true, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) { - assertHints(conn, false, false, true, false, false, false); + assertHints(conn, false, false, true, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) { - assertHints(conn, false, false, false, true, false, false); + assertHints(conn, false, false, false, true, true, false); } - try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=true")) { - assertHints(conn, false, false, false, false, true, false); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=false")) { + assertHints(conn, false, false, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true")) { - assertHints(conn, false, false, false, false, false, true); + assertHints(conn, false, false, false, false, true, true); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" + - "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true&skipReducerOnUpdate=true")) { - assertHints(conn, true, true, true, true, true, true); + "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=false&skipReducerOnUpdate=true")) { + assertHints(conn, true, true, true, true, false, true); } } @@ -270,32 +270,32 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { */ public void testSqlHintsSemicolon() throws Exception { try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true")) { - assertHints(conn, true, false, false, false, false, false); + assertHints(conn, true, false, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;enforceJoinOrder=true")) { - assertHints(conn, false, true, false, false, false, false); + assertHints(conn, false, true, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;collocated=true")) { - assertHints(conn, false, false, true, false, false, false); + assertHints(conn, false, false, true, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;replicatedOnly=true")) { - assertHints(conn, false, false, false, true, false, false); + assertHints(conn, false, false, false, true, true, false); } - try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=true")) { - assertHints(conn, false, false, false, false, true, false); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=false")) { + assertHints(conn, false, false, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;skipReducerOnUpdate=true")) { - assertHints(conn, false, false, false, false, false, true); + assertHints(conn, false, false, false, false, true, true); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true;" + - "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=true;skipReducerOnUpdate=true")) { - assertHints(conn, true, true, true, true, true, true); + "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=false;skipReducerOnUpdate=true")) { + assertHints(conn, true, true, true, true, false, true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java index 6040bed..834b4ca 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java @@ -142,15 +142,15 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { public void testResetUrl() throws Exception { IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource(); - ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=true"); + ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=false"); assertEquals("test", ids.getSchema()); - assertTrue(ids.isLazy()); + assertFalse(ids.isLazy()); ids.setUrl("jdbc:ignite:thin://mydomain.org,localhost?collocated=true"); assertNull(ids.getSchema()); - assertFalse(ids.isLazy()); + assertTrue(ids.isLazy()); assertTrue(ids.isCollocated()); } @@ -168,7 +168,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { assertFalse(io.connectionProperties().isAutoCloseServerCursor()); assertFalse(io.connectionProperties().isCollocated()); assertFalse(io.connectionProperties().isEnforceJoinOrder()); - assertFalse(io.connectionProperties().isLazy()); + assertTrue(io.connectionProperties().isLazy()); assertFalse(io.connectionProperties().isDistributedJoins()); assertFalse(io.connectionProperties().isReplicatedOnly()); } @@ -176,7 +176,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { ids.setAutoCloseServerCursor(true); ids.setCollocated(true); ids.setEnforceJoinOrder(true); - ids.setLazy(true); + ids.setLazy(false); ids.setDistributedJoins(true); ids.setReplicatedOnly(true); @@ -186,7 +186,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { assertTrue(io.connectionProperties().isAutoCloseServerCursor()); assertTrue(io.connectionProperties().isCollocated()); assertTrue(io.connectionProperties().isEnforceJoinOrder()); - assertTrue(io.connectionProperties().isLazy()); + assertFalse(io.connectionProperties().isLazy()); assertTrue(io.connectionProperties().isDistributedJoins()); assertTrue(io.connectionProperties().isReplicatedOnly()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 4ecffbe..877b467 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -485,7 +485,12 @@ public final class IgniteSystemProperties { /** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */ public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK"; - /** Force all SQL queries to be processed lazily regardless of what clients request. */ + /** + * Force all SQL queries to be processed lazily regardless of what clients request. + * + * @deprecated Since version 2.7. + */ + @Deprecated public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET"; /** Disable SQL system views. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 4e12b8c..3e5c706 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -71,8 +71,8 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private boolean replicatedOnly; - /** */ - private boolean lazy; + /** Lazy mode is default since Ignite v.2.7. */ + private boolean lazy = true; /** Partitions for query */ private int[] parts; @@ -292,19 +292,24 @@ public class SqlFieldsQuery extends Query<List<?>> { /** * Sets lazy query execution flag. * <p> - * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small - * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus - * increasing concurrency. - * <p> * If result set is too big to fit in available memory this could lead to excessive GC pauses and even * OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory * consumption at the cost of moderate performance hit. + * Now lazy mode is optimized for small and medium result set. Small result set means results rows count + * less then page size (see {@link #setPageSize}). * <p> - * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly. + * To compatibility with previous version behavior lazy mode may be switched off. In this case Ignite attempts + * to fetch the whole query result set to memory and send it to the client. + * <p> + * Since version 2.7 lazy mode is used by default. + * Defaults to {@code true}, meaning that the result set is fetched lazily if it is possible. * * @param lazy Lazy query execution flag. * @return {@code this} For chaining. + * + * @deprecated Since Ignite 2.7. */ + @Deprecated public SqlFieldsQuery setLazy(boolean lazy) { this.lazy = lazy; @@ -318,6 +323,7 @@ public class SqlFieldsQuery extends Query<List<?>> { * * @return Lazy flag. */ + @Deprecated public boolean isLazy() { return lazy; } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java index 51a3837..054807a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java @@ -84,7 +84,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** Lazy query execution property. */ private BooleanProperty lazy = new BooleanProperty( - "lazy", "Enable lazy query execution", false, false); + "lazy", "Enable lazy query execution (lazy mode is used by default since v.2.7)", true, false); /** Socket send buffer size property. */ private IntegerProperty socketSendBuffer = new IntegerProperty( http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index c589c06..481794e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -195,7 +195,7 @@ public class JdbcConnection implements Connection { collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED)); distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS)); enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER)); - lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY)); + lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY, "true")); txAllowed = Boolean.parseBoolean(props.getProperty(PROP_TX_ALLOWED)); stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING)); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 3ffbb00..5f397d5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -4054,7 +4054,7 @@ public abstract class IgniteUtils { rsrc.close(); } catch (Exception e) { - warn(log, "Failed to close resource: " + e.getMessage()); + warn(log, "Failed to close resource: " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java index 425015a..020cd5e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java @@ -33,6 +33,9 @@ public class H2ConnectionWrapper implements AutoCloseable { private final Connection conn; /** */ + private final Thread intiThread; + + /** */ private volatile String schema; /** */ @@ -43,6 +46,7 @@ public class H2ConnectionWrapper implements AutoCloseable { */ H2ConnectionWrapper(Connection conn) { this.conn = conn; + intiThread = Thread.currentThread(); initStatementCache(); } @@ -96,6 +100,13 @@ public class H2ConnectionWrapper implements AutoCloseable { statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE); } + /** + * @return Thread where the connection was created. + */ + public Thread initialThread() { + return intiThread; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(H2ConnectionWrapper.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index b9d9d8e..074a3e4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -237,10 +237,25 @@ public class H2Utils { * @param enforceJoinOrder Enforce join order of tables. */ public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) { + setupConnection(conn,distributedJoins, enforceJoinOrder, false); + } + + /** + * @param conn Connection to use. + * @param distributedJoins If distributed joins are enabled. + * @param enforceJoinOrder Enforce join order of tables. + * @param lazy Lazy query execution mode. + */ + public static void setupConnection( + Connection conn, + boolean distributedJoins, + boolean enforceJoinOrder, + boolean lazy) { Session s = session(conn); s.setForceJoinOrder(enforceJoinOrder); s.setJoinBatchEnabled(distributedJoins); + s.setLazyQueryExecution(lazy); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index fe563b7..b90e670 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -136,7 +136,6 @@ import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNode import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNodes; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; -import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; @@ -297,9 +296,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private String dbUrl = "jdbc:h2:mem:"; - /** */ + /** All connections are used by Ignite instance. Map of (H2ConnectionWrapper, Boolean) is used as a Set. */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>(); + private final ConcurrentMap<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> conns = new ConcurrentHashMap<>(); /** */ private GridMapQueryExecutor mapQryExec; @@ -327,13 +326,23 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5); + private final ThreadLocal<ObjectPool<H2ConnectionWrapper>> connectionPool + = new ThreadLocal<ObjectPool<H2ConnectionWrapper>>() { + @Override protected ObjectPool<H2ConnectionWrapper> initialValue() { + return new ObjectPool<>( + IgniteH2Indexing.this::newConnectionWrapper, + 50, + IgniteH2Indexing.this::closePooledConnectionWrapper, + IgniteH2Indexing.this::recycleConnection); + } + }; /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() { - @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() { - ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get(); + private final ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>> connCache + = new ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>>() { + @Override public ObjectPoolReusable<H2ConnectionWrapper> get() { + ObjectPoolReusable<H2ConnectionWrapper> reusable = super.get(); boolean reconnect = true; @@ -353,10 +362,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { return reusable; } - @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() { - ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow(); + @Override protected ObjectPoolReusable<H2ConnectionWrapper> initialValue() { + ObjectPool<H2ConnectionWrapper> pool = connectionPool.get(); + + ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = pool.borrow(); + + ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(Thread.currentThread()); + + ConcurrentHashMap<H2ConnectionWrapper, Boolean> newMap = new ConcurrentHashMap<>(); + + perThreadConns = conns.putIfAbsent(Thread.currentThread(), newMap); + + if (perThreadConns == null) + perThreadConns = newMap; - conns.put(Thread.currentThread(), reusableConnection.object()); + perThreadConns.put(reusableConnection.object(), false); return reusableConnection; } @@ -436,16 +456,54 @@ public class IgniteH2Indexing implements GridQueryIndexing { return sysConn; } - /** */ + /** + * @return Connection wrapper. + */ private H2ConnectionWrapper newConnectionWrapper() { try { - return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl)); + Connection c = DriverManager.getConnection(dbUrl); + return new H2ConnectionWrapper(c); } catch (SQLException e) { throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e); } } /** + * @param conn Connection wrapper to close. + */ + private void closePooledConnectionWrapper(H2ConnectionWrapper conn) { + conns.get(conn.initialThread()).remove(conn); + + U.closeQuiet(conn); + } + + /** + * Removes from threadlocal cache and returns associated with current thread connection. + * @return Connection associated with current thread. + */ + public ObjectPoolReusable<H2ConnectionWrapper> detachConnection() { + ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = connCache.get(); + + connCache.remove(); + + conns.get(Thread.currentThread()).remove(reusableConnection.object()); + + return reusableConnection; + } + + /** + * Return connection to the glob all connection collection. + * @param conn Recycled connection. + */ + private void recycleConnection(H2ConnectionWrapper conn) { + ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(conn.initialThread()); + + // Mau be null when node is stopping. + if (perThreadConns != null) + perThreadConns.put(conn, false); + } + + /** * @param c Connection. * @param sql SQL. * @return <b>Cached</b> prepared statement. @@ -737,12 +795,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Handles SQL exception. */ private void onSqlException() { - Connection conn = connCache.get().object().connection(); + H2ConnectionWrapper conn = connCache.get().object(); connCache.set(null); if (conn != null) { - conns.remove(Thread.currentThread()); + conns.get(Thread.currentThread()).remove(conn); // Reset connection to receive new one at next call. U.close(conn, log); @@ -1389,32 +1447,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { */ private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt, int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException { - final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker(); - - if (cancel != null) { - cancel.set(new Runnable() { - @Override public void run() { - if (lazyWorker != null) { - lazyWorker.submit(new Runnable() { - @Override public void run() { - cancelStatement(stmt); - } - }); - } - else - cancelStatement(stmt); - } - }); - } + if (cancel != null) + cancel.set(() -> cancelStatement(stmt)); Session ses = H2Utils.session(conn); if (timeoutMillis > 0) ses.setQueryTimeout(timeoutMillis); - if (lazyWorker != null) - ses.setLazyQueryExecution(true); - try { return stmt.executeQuery(); } @@ -1428,9 +1468,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { finally { if (timeoutMillis > 0) ses.setQueryTimeout(0); - - if (lazyWorker != null) - ses.setLazyQueryExecution(false); } } @@ -2540,6 +2577,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { topVer, mvccSnapshot); } + /** + * @param flags Flags holder. + * @param flag Flag mask to check. + * @return {@code true} if flag is set, otherwise returns {@code false}. + */ private boolean isFlagSet(int flags, int flag) { return (flags & flag) == flag; } @@ -3017,18 +3059,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { private void cleanupStatementCache() { long now = U.currentTimeMillis(); - for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<Thread, H2ConnectionWrapper> entry = it.next(); + for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it + = conns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next(); Thread t = entry.getKey(); if (t.getState() == Thread.State.TERMINATED) { - U.close(entry.getValue(), log); + for (H2ConnectionWrapper c : entry.getValue().keySet()) + U.close(c, log); it.remove(); } - else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT) - entry.getValue().clearStatementCache(); + else { + for (H2ConnectionWrapper c : entry.getValue().keySet()) { + if (now - c.statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT) + c.clearStatementCache(); + } + } } } @@ -3036,13 +3084,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}. */ private void cleanupConnections() { - for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<Thread, H2ConnectionWrapper> entry = it.next(); + for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it + = conns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next(); Thread t = entry.getKey(); if (t.getState() == Thread.State.TERMINATED) { - U.close(entry.getValue(), log); + for (H2ConnectionWrapper c : entry.getValue().keySet()) + U.close(c, log); it.remove(); } @@ -3050,24 +3100,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Removes from cache and returns associated with current thread connection. - * @return Connection associated with current thread. - */ - public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() { - Thread key = Thread.currentThread(); - - ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get(); - - H2ConnectionWrapper connection = conns.remove(key); - - connCache.remove(); - - assert reusableConnection.object().connection() == connection.connection(); - - return reusableConnection; - } - - /** * Rebuild indexes from hash index. * * @param cacheName Cache name. @@ -3431,10 +3463,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Stopping cache query index..."); - mapQryExec.cancelLazyWorkers(); + mapQryExec.stop(); + + for (ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) { + for (H2ConnectionWrapper c : perThreadConns.keySet()) + U.close(c, log); + } - for (H2ConnectionWrapper c : conns.values()) - U.close(c, log); + connectionPool.remove(); + connCache.remove(); conns.clear(); schemas.clear(); @@ -3543,7 +3580,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - conns.values().forEach(H2ConnectionWrapper::clearStatementCache); + conns.values().forEach(map -> map.keySet().forEach(H2ConnectionWrapper::clearStatementCache)); for (H2TableDescriptor tbl : rmvTbls) { for (Index idx : tbl.table().getIndexes()) @@ -3701,10 +3738,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public void cancelAllQueries() { - mapQryExec.cancelLazyWorkers(); - - for (H2ConnectionWrapper c : conns.values()) - U.close(c, log); + for (ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) { + for (H2ConnectionWrapper c : perThreadConns.keySet()) + U.close(c, log); + } } /** @@ -3754,6 +3791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param twoStepQry Query. * @return {@code True} is system views exist. */ private boolean hasSystemViews(GridCacheTwoStepQuery twoStepQry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java new file mode 100644 index 0000000..9d2a580 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * Thread-safe pool for managing limited number objects for further reuse. + * + * @param <E> Pooled objects type. + */ +public final class ObjectPool<E extends AutoCloseable> { + /** */ + private final Supplier<E> objectFactory; + + /** */ + private final ConcurrentLinkedQueue<E> bag = new ConcurrentLinkedQueue<>(); + + /** */ + private final int poolSize; + + /** The function to close object. */ + private final Consumer<E> closer; + + /** The listener is called when object is returned to the pool. */ + private final Consumer<E> recycler; + + /** + * @param objectFactory Factory used for new objects creation. + * @param poolSize Number of objects which pool can contain. + * @param closer Function to close object. + * @param recycler The listener is called when object is returned to the pool. + */ + public ObjectPool(Supplier<E> objectFactory, int poolSize, Consumer<E> closer, Consumer<E> recycler) { + this.objectFactory = objectFactory; + this.poolSize = poolSize; + this.closer = closer != null ? closer : U::closeQuiet; + this.recycler = recycler; + } + + /** + * Picks an object from the pool if one is present or creates new one otherwise. + * Returns an object wrapper which could be returned to the pool. + * + * @return Reusable object wrapper. + */ + public ObjectPoolReusable<E> borrow() { + E pooled = bag.poll(); + + return new ObjectPoolReusable<>(this, pooled != null ? pooled : objectFactory.get()); + } + + /** + * Recycles an object. + * + * @param object Object. + */ + void recycle(E object) { + assert object != null : "Already recycled"; + + if (bag.size() < poolSize) { + bag.add(object); + + if (recycler != null) + recycler.accept(object); + } + else + closer.accept(object); + } + + /** + * Visible for test + * @return Pool bag size. + */ + int bagSize() { + return bag.size(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java new file mode 100644 index 0000000..48fee42 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +/** + * Wrapper for a pooled object with capability to return the object to a pool. + * + * @param <T> Enclosed object type. + */ +public class ObjectPoolReusable<T extends AutoCloseable> { + /** Object pool to recycle. */ + private final ObjectPool<T> pool; + + /** Detached object. */ + private T object; + + /** + * @param pool Object pool. + * @param object Detached object. + */ + ObjectPoolReusable(ObjectPool<T> pool, T object) { + this.pool = pool; + this.object = object; + } + + /** + * @return Enclosed object. + */ + public T object() { + return object; + } + + /** + * Returns an object to a pool or closes it if the pool is already full. + */ + public void recycle() { + assert object != null : "Already recycled"; + + pool.recycle(object); + + object = null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java deleted file mode 100644 index 25daa23..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.function.Supplier; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Special pool for managing limited number objects for further reuse. - * This pool maintains separate object bag for each thread by means of {@link ThreadLocal}. - * <p> - * If object is borrowed on one thread and recycled on different then it will be returned to - * recycling thread bag. For thread-safe use either pooled objects should be thread-safe or - * <i>happens-before</i> should be established between borrowing object and subsequent recycling. - * - * @param <E> pooled objects type - */ -public final class ThreadLocalObjectPool<E extends AutoCloseable> { - /** - * Wrapper for a pooled object with capability to return the object to a pool. - * - * @param <T> enclosed object type - */ - public static class Reusable<T extends AutoCloseable> { - /** */ - private final ThreadLocalObjectPool<T> pool; - /** */ - private final T object; - - /** */ - private Reusable(ThreadLocalObjectPool<T> pool, T object) { - this.pool = pool; - this.object = object; - } - - /** - * @return enclosed object - */ - public T object() { - return object; - } - - /** - * Returns an object to a pool or closes it if the pool is already full. - */ - public void recycle() { - Queue<Reusable<T>> bag = pool.bag.get(); - if (bag.size() < pool.poolSize) - bag.add(this); - else - U.closeQuiet(object); - } - } - - /** */ - private final Supplier<E> objectFactory; - /** */ - private final ThreadLocal<Queue<Reusable<E>>> bag = ThreadLocal.withInitial(LinkedList::new); - /** */ - private final int poolSize; - - /** - * @param objectFactory factory used for new objects creation - * @param poolSize number of objects which pool can contain - */ - public ThreadLocalObjectPool(Supplier<E> objectFactory, int poolSize) { - this.objectFactory = objectFactory; - this.poolSize = poolSize; - } - - /** - * Picks an object from the pool if one is present or creates new one otherwise. - * Returns an object wrapper which could be returned to the pool. - * - * @return reusable object wrapper - */ - public Reusable<E> borrow() { - Reusable<E> pooled = bag.get().poll(); - return pooled != null ? pooled : new Reusable<>(this, objectFactory.get()); - } - - /** Visible for test */ - int bagSize() { - return bag.get().size(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index ba4b12b..31a444e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool; +import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; @@ -623,7 +623,7 @@ public final class UpdatePlan { private final EnlistOperation op; /** */ - private volatile ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn; + private volatile ObjectPoolReusable<H2ConnectionWrapper> conn; /** * @param idx Indexing. @@ -647,7 +647,7 @@ public final class UpdatePlan { /** {@inheritDoc} */ @Override public void beforeDetach() { - ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach(); + ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn = idx.detachConnection(); if (isClosed()) conn0.recycle(); @@ -657,7 +657,7 @@ public final class UpdatePlan { @Override protected void onClose() { cur.close(); - ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn; + ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn; if (conn0 != null) conn0.recycle(); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index f12c0f3..9971b78 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -89,7 +91,7 @@ public class GridH2QueryContext { private MvccSnapshot mvccSnapshot; /** */ - private MapQueryLazyWorker lazyWorker; + private Set<GridH2Table> lockedTables = new HashSet<>(); /** * @param locNodeId Local node ID. @@ -351,7 +353,8 @@ public class GridH2QueryContext { assert qctx.get() == null; // We need MAP query context to be available to other threads to run distributed joins. - if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null) + if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null + && MapQueryLazyWorker.currentWorker() == null) throw new IllegalStateException("Query context is already set."); qctx.set(x); @@ -401,10 +404,7 @@ public class GridH2QueryContext { assert x.key.equals(key); - if (x.lazyWorker() != null) - x.lazyWorker().stop(nodeStop); - else - x.clearContext(nodeStop); + x.clearContext(nodeStop); return true; } @@ -413,7 +413,10 @@ public class GridH2QueryContext { * @param nodeStop Node is stopping. */ @SuppressWarnings("ForLoopReplaceableByForEach") - public void clearContext(boolean nodeStop) { + public synchronized void clearContext(boolean nodeStop) { + if (cleared) + return; + cleared = true; List<GridReservable> r = reservations; @@ -516,20 +519,10 @@ public class GridH2QueryContext { } /** - * @return Lazy worker, if any, or {@code null} if none. + * @return The set of tables have been locked by current thread. */ - public MapQueryLazyWorker lazyWorker() { - return lazyWorker; - } - - /** - * @param lazyWorker Lazy worker, if any, or {@code null} if none. - * @return {@code this}. - */ - public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) { - this.lazyWorker = lazyWorker; - - return this; + public Set<GridH2Table> lockedTables() { + return lockedTables; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index a612b63..709ded7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -17,17 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -37,7 +26,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; -import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; +import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.util.typedef.F; import org.h2.command.ddl.CreateTableData; import org.h2.command.dml.Insert; @@ -58,6 +47,19 @@ import org.h2.table.TableType; import org.h2.value.DataType; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; @@ -90,6 +92,12 @@ public class GridH2Table extends TableBase { /** */ private final ReadWriteLock lock; + /** Number of reading threads which currently move execution from query pool to dedicated thread. */ + private final AtomicInteger lazyTransferCnt = new AtomicInteger(); + + /** Has writer that waits lock in the loop. */ + private volatile boolean hasWaitedWriter; + /** */ private boolean destroyed; @@ -265,6 +273,11 @@ public class GridH2Table extends TableBase { ses.addLock(this); + GridH2QueryContext qctx = GridH2QueryContext.get(); + + if (qctx != null) + qctx.lockedTables().add(this); + return false; } @@ -291,15 +304,44 @@ public class GridH2Table extends TableBase { Lock l = exclusive ? lock.writeLock() : lock.readLock(); try { - if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY) - l.lockInterruptibly(); - else { + if (exclusive) { + // Attempting to obtain exclusive lock for DDL. + // Lock is considered acquired only if "lazyTransferCnt" is zero, meaning that + // currently there are no reader threads moving query execution from query + // pool to dedicated thread. + // It is possible that reader which is currently transferring execution gets + // queued after the write lock we are trying to acquire. So we use timed waiting + // and a loop to avoid deadlocks. for (;;) { - if (l.tryLock(200, TimeUnit.MILLISECONDS)) - break; - else - Thread.yield(); + if (l.tryLock(200, TimeUnit.MILLISECONDS)) { + if (lazyTransferCnt.get() == 0) + break; + else + l.unlock(); + } + + hasWaitedWriter = true; + + Thread.yield(); } + + hasWaitedWriter = false; + } + else { + // Attempt to acquire read lock (query execution, DML, cache update). + // If query is being executed inside a query pool, we do not want it to be blocked + // for a long time, as it would prevent other queries from being executed. So we + // wait a little and then force transfer to dedicated thread by throwing special + // timeout exception.GridNioSslSelfTest + // If query is not in the query pool, then we simply wait for lock acquisition. + if (isSqlNotInLazy()) { + if (hasWaitedWriter || !l.tryLock(200, TimeUnit.MILLISECONDS)) { + throw new GridH2RetryException("Long wait on Table lock: [tableName=" + getName() + + ", hasWaitedWriter=" + hasWaitedWriter + ']'); + } + } + else + l.lockInterruptibly(); } } catch (InterruptedException e) { @@ -321,6 +363,49 @@ public class GridH2Table extends TableBase { } /** + * Check if table is being locked in not lazy thread by SQL query. + * + * @return {@code True} if is in query pool. + */ + private static boolean isSqlNotInLazy() { + return GridH2QueryContext.get() != null && MapQueryLazyWorker.currentWorker() == null; + } + + /** + * Callback invoked when session is to be transferred to lazy thread. In order to prevent concurrent changes + * by DDL during move we increment counter before releasing read lock. + * + * @param ses Session. + */ + public void onLazyTransferStarted(Session ses) { + assert sessions.containsKey(ses) : "Detached session have not locked the table: " + getName(); + + lazyTransferCnt.incrementAndGet(); + + lock.readLock().unlock(); + } + + /** + * Callback invoked when lazy transfer finished. Acquire the lock, decrement transfer counter. + * + * @param ses Session to detach. + */ + public void onLazyTransferFinished(Session ses) { + assert sessions.containsKey(ses) : "Attached session have not locked the table: " + getName(); + + try { + lock.readLock().lockInterruptibly(); + + lazyTransferCnt.decrementAndGet(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e); + } + } + + /** * Check if table is not destroyed. */ private void ensureNotDestroyed() { @@ -410,6 +495,11 @@ public class GridH2Table extends TableBase { if (exclusive == null) return; + GridH2QueryContext qctx = GridH2QueryContext.get(); + + if (qctx != null) + qctx.lockedTables().remove(this); + unlock(exclusive); } @@ -949,9 +1039,10 @@ public class GridH2Table extends TableBase { } /** + * Drop columns. * - * @param cols - * @param ifExists + * @param cols Columns. + * @param ifExists IF EXISTS flag. */ public void dropColumns(List<String> cols, boolean ifExists) { assert !ifExists || cols.size() == 1;
