IGNITE-9960: SQL: Reverted IGNITE-9171 and IGNITE-9864 until performance is fixed. This closes #5045.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ccde7c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ccde7c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ccde7c4 Branch: refs/heads/ignite-2.7 Commit: 0ccde7c429df12aef07a617ca38fd48df2f92b0a Parents: 42ab0eb Author: devozerov <[email protected]> Authored: Tue Oct 23 10:33:26 2018 +0300 Committer: devozerov <[email protected]> Committed: Tue Oct 23 10:35:00 2018 +0300 ---------------------------------------------------------------------- .../internal/jdbc2/JdbcConnectionSelfTest.java | 12 +- .../jdbc/thin/JdbcThinConnectionSelfTest.java | 38 +- .../jdbc/thin/JdbcThinDataSourceSelfTest.java | 12 +- .../apache/ignite/IgniteSystemProperties.java | 7 +- .../ignite/cache/query/SqlFieldsQuery.java | 20 +- .../jdbc/thin/ConnectionPropertiesImpl.java | 2 +- .../ignite/internal/jdbc2/JdbcConnection.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 2 +- .../query/h2/H2ConnectionWrapper.java | 11 - .../internal/processors/query/h2/H2Utils.java | 15 - .../processors/query/h2/IgniteH2Indexing.java | 186 +++---- .../processors/query/h2/ObjectPool.java | 97 ---- .../processors/query/h2/ObjectPoolReusable.java | 58 --- .../query/h2/ThreadLocalObjectPool.java | 103 ++++ .../processors/query/h2/dml/UpdatePlan.java | 8 +- .../query/h2/opt/GridH2QueryContext.java | 33 +- .../processors/query/h2/opt/GridH2Table.java | 133 +---- .../query/h2/twostep/GridMapQueryExecutor.java | 498 +++++++++---------- .../h2/twostep/GridReduceQueryExecutor.java | 16 +- .../query/h2/twostep/GridResultPage.java | 7 +- .../query/h2/twostep/MapNodeResults.java | 13 +- .../query/h2/twostep/MapQueryLazyWorker.java | 223 ++------- .../query/h2/twostep/MapQueryResult.java | 34 +- .../query/h2/twostep/MapQueryResults.java | 40 +- ...GridCacheLazyQueryPartitionsReleaseTest.java | 2 + .../IgniteCacheQueryH2IndexingLeakTest.java | 9 +- ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 43 +- ...cheQueryAbstractDistributedJoinSelfTest.java | 5 - ...QueryNodeRestartDistributedJoinSelfTest.java | 14 +- ...nCancelOrTimeoutDistributedJoinSelfTest.java | 23 +- ...ynamicColumnsAbstractConcurrentSelfTest.java | 6 +- .../cache/index/H2ConnectionLeaksSelfTest.java | 2 +- .../processors/query/LazyQuerySelfTest.java | 202 +------- .../processors/query/h2/ObjectPoolSelfTest.java | 125 ----- .../query/h2/ThreadLocalObjectPoolSelfTest.java | 113 +++++ .../h2/twostep/RetryCauseMessageSelfTest.java | 16 + .../IgniteCacheQuerySelfTestSuite.java | 4 +- .../ignite/cache/query/query_sql_fields.h | 4 +- .../cpp/odbc-test/src/configuration_test.cpp | 4 +- .../cpp/odbc/src/config/configuration.cpp | 2 +- .../Cache/Query/CacheQueriesTest.cs | 6 +- .../Query/Linq/CacheLinqTest.Introspection.cs | 2 - .../Client/Cache/SqlQueryTest.cs | 4 +- .../Cache/Query/SqlFieldsQuery.cs | 43 +- ...benchmark-native-sql-cache-select.properties | 96 ---- .../benchmark-native-sql-select.properties | 17 +- .../ignite-localhost-sql-query-config.xml | 91 ---- .../yardstick/IgniteAbstractBenchmark.java | 30 +- .../yardstick/IgniteBenchmarkArguments.java | 13 - .../yardstick/jdbc/AbstractNativeBenchmark.java | 3 - .../apache/ignite/yardstick/jdbc/JdbcUtils.java | 47 +- .../jdbc/NativeSqlCacheQueryRangeBenchmark.java | 145 ------ .../jdbc/NativeSqlQueryRangeBenchmark.java | 13 +- 53 files changed, 848 insertions(+), 1806 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java index db0a959..d560d74 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java @@ -308,7 +308,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertTrue(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertTrue(((JdbcConnection)conn).isLazy()); + assertFalse(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } @@ -317,7 +317,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertTrue(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertTrue(((JdbcConnection)conn).isLazy()); + assertFalse(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } @@ -326,15 +326,15 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertTrue(((JdbcConnection)conn).isCollocatedQuery()); - assertTrue(((JdbcConnection)conn).isLazy()); + assertFalse(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } - try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=false@" + configURL())) { + try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=true@" + configURL())) { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertFalse(((JdbcConnection)conn).isLazy()); + assertTrue(((JdbcConnection)conn).isLazy()); assertFalse(((JdbcConnection)conn).skipReducerOnUpdate()); } try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "skipReducerOnUpdate=true@" @@ -342,7 +342,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest { assertFalse(((JdbcConnection)conn).isEnforceJoinOrder()); assertFalse(((JdbcConnection)conn).isDistributedJoins()); assertFalse(((JdbcConnection)conn).isCollocatedQuery()); - assertTrue(((JdbcConnection)conn).isLazy()); + assertFalse(((JdbcConnection)conn).isLazy()); assertTrue(((JdbcConnection)conn).skipReducerOnUpdate()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 26c34cf..80397e6 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -230,36 +230,36 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { */ public void testSqlHints() throws Exception { try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { - assertHints(conn, false, false, false, false, true, false); + assertHints(conn, false, false, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) { - assertHints(conn, true, false, false, false, true, false); + assertHints(conn, true, false, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) { - assertHints(conn, false, true, false, false, true, false); + assertHints(conn, false, true, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) { - assertHints(conn, false, false, true, false, true, false); + assertHints(conn, false, false, true, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) { - assertHints(conn, false, false, false, true, true, false); + assertHints(conn, false, false, false, true, false, false); } - try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=false")) { - assertHints(conn, false, false, false, false, false, false); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=true")) { + assertHints(conn, false, false, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true")) { - assertHints(conn, false, false, false, false, true, true); + assertHints(conn, false, false, false, false, false, true); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" + - "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=false&skipReducerOnUpdate=true")) { - assertHints(conn, true, true, true, true, false, true); + "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true&skipReducerOnUpdate=true")) { + assertHints(conn, true, true, true, true, true, true); } } @@ -270,32 +270,32 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { */ public void testSqlHintsSemicolon() throws Exception { try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true")) { - assertHints(conn, true, false, false, false, true, false); + assertHints(conn, true, false, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;enforceJoinOrder=true")) { - assertHints(conn, false, true, false, false, true, false); + assertHints(conn, false, true, false, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;collocated=true")) { - assertHints(conn, false, false, true, false, true, false); + assertHints(conn, false, false, true, false, false, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;replicatedOnly=true")) { - assertHints(conn, false, false, false, true, true, false); + assertHints(conn, false, false, false, true, false, false); } - try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=false")) { - assertHints(conn, false, false, false, false, false, false); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=true")) { + assertHints(conn, false, false, false, false, true, false); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;skipReducerOnUpdate=true")) { - assertHints(conn, false, false, false, false, true, true); + assertHints(conn, false, false, false, false, false, true); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true;" + - "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=false;skipReducerOnUpdate=true")) { - assertHints(conn, true, true, true, true, false, true); + "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=true;skipReducerOnUpdate=true")) { + assertHints(conn, true, true, true, true, true, true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java index 834b4ca..6040bed 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java @@ -142,15 +142,15 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { public void testResetUrl() throws Exception { IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource(); - ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=false"); + ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=true"); assertEquals("test", ids.getSchema()); - assertFalse(ids.isLazy()); + assertTrue(ids.isLazy()); ids.setUrl("jdbc:ignite:thin://mydomain.org,localhost?collocated=true"); assertNull(ids.getSchema()); - assertTrue(ids.isLazy()); + assertFalse(ids.isLazy()); assertTrue(ids.isCollocated()); } @@ -168,7 +168,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { assertFalse(io.connectionProperties().isAutoCloseServerCursor()); assertFalse(io.connectionProperties().isCollocated()); assertFalse(io.connectionProperties().isEnforceJoinOrder()); - assertTrue(io.connectionProperties().isLazy()); + assertFalse(io.connectionProperties().isLazy()); assertFalse(io.connectionProperties().isDistributedJoins()); assertFalse(io.connectionProperties().isReplicatedOnly()); } @@ -176,7 +176,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { ids.setAutoCloseServerCursor(true); ids.setCollocated(true); ids.setEnforceJoinOrder(true); - ids.setLazy(false); + ids.setLazy(true); ids.setDistributedJoins(true); ids.setReplicatedOnly(true); @@ -186,7 +186,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest { assertTrue(io.connectionProperties().isAutoCloseServerCursor()); assertTrue(io.connectionProperties().isCollocated()); assertTrue(io.connectionProperties().isEnforceJoinOrder()); - assertFalse(io.connectionProperties().isLazy()); + assertTrue(io.connectionProperties().isLazy()); assertTrue(io.connectionProperties().isDistributedJoins()); assertTrue(io.connectionProperties().isReplicatedOnly()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index a1ff376..52d6a36 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -484,12 +484,7 @@ public final class IgniteSystemProperties { /** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */ public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK"; - /** - * Force all SQL queries to be processed lazily regardless of what clients request. - * - * @deprecated Since version 2.7. - */ - @Deprecated + /** Force all SQL queries to be processed lazily regardless of what clients request. */ public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET"; /** Disable SQL system views. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 3e5c706..4e12b8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -71,8 +71,8 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private boolean replicatedOnly; - /** Lazy mode is default since Ignite v.2.7. */ - private boolean lazy = true; + /** */ + private boolean lazy; /** Partitions for query */ private int[] parts; @@ -292,24 +292,19 @@ public class SqlFieldsQuery extends Query<List<?>> { /** * Sets lazy query execution flag. * <p> + * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small + * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus + * increasing concurrency. + * <p> * If result set is too big to fit in available memory this could lead to excessive GC pauses and even * OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory * consumption at the cost of moderate performance hit. - * Now lazy mode is optimized for small and medium result set. Small result set means results rows count - * less then page size (see {@link #setPageSize}). * <p> - * To compatibility with previous version behavior lazy mode may be switched off. In this case Ignite attempts - * to fetch the whole query result set to memory and send it to the client. - * <p> - * Since version 2.7 lazy mode is used by default. - * Defaults to {@code true}, meaning that the result set is fetched lazily if it is possible. + * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly. * * @param lazy Lazy query execution flag. * @return {@code this} For chaining. - * - * @deprecated Since Ignite 2.7. */ - @Deprecated public SqlFieldsQuery setLazy(boolean lazy) { this.lazy = lazy; @@ -323,7 +318,6 @@ public class SqlFieldsQuery extends Query<List<?>> { * * @return Lazy flag. */ - @Deprecated public boolean isLazy() { return lazy; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java index 054807a..51a3837 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java @@ -84,7 +84,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** Lazy query execution property. */ private BooleanProperty lazy = new BooleanProperty( - "lazy", "Enable lazy query execution (lazy mode is used by default since v.2.7)", true, false); + "lazy", "Enable lazy query execution", false, false); /** Socket send buffer size property. */ private IntegerProperty socketSendBuffer = new IntegerProperty( http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 481794e..c589c06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -195,7 +195,7 @@ public class JdbcConnection implements Connection { collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED)); distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS)); enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER)); - lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY, "true")); + lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY)); txAllowed = Boolean.parseBoolean(props.getProperty(PROP_TX_ALLOWED)); stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING)); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 777a6fc..aa106f8 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -4066,7 +4066,7 @@ public abstract class IgniteUtils { rsrc.close(); } catch (Exception e) { - warn(log, "Failed to close resource: " + e.getMessage(), e); + warn(log, "Failed to close resource: " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java index 020cd5e..425015a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java @@ -33,9 +33,6 @@ public class H2ConnectionWrapper implements AutoCloseable { private final Connection conn; /** */ - private final Thread intiThread; - - /** */ private volatile String schema; /** */ @@ -46,7 +43,6 @@ public class H2ConnectionWrapper implements AutoCloseable { */ H2ConnectionWrapper(Connection conn) { this.conn = conn; - intiThread = Thread.currentThread(); initStatementCache(); } @@ -100,13 +96,6 @@ public class H2ConnectionWrapper implements AutoCloseable { statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE); } - /** - * @return Thread where the connection was created. - */ - public Thread initialThread() { - return intiThread; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(H2ConnectionWrapper.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index 074a3e4..b9d9d8e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -237,25 +237,10 @@ public class H2Utils { * @param enforceJoinOrder Enforce join order of tables. */ public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) { - setupConnection(conn,distributedJoins, enforceJoinOrder, false); - } - - /** - * @param conn Connection to use. - * @param distributedJoins If distributed joins are enabled. - * @param enforceJoinOrder Enforce join order of tables. - * @param lazy Lazy query execution mode. - */ - public static void setupConnection( - Connection conn, - boolean distributedJoins, - boolean enforceJoinOrder, - boolean lazy) { Session s = session(conn); s.setForceJoinOrder(enforceJoinOrder); s.setJoinBatchEnabled(distributedJoins); - s.setLazyQueryExecution(lazy); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 162e3c5..93ea70a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -136,6 +136,7 @@ import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNode import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNodes; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; +import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; @@ -297,9 +298,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private String dbUrl = "jdbc:h2:mem:"; - /** All connections are used by Ignite instance. Map of (H2ConnectionWrapper, Boolean) is used as a Set. */ + /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ConcurrentMap<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> conns = new ConcurrentHashMap<>(); + private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>(); /** */ private GridMapQueryExecutor mapQryExec; @@ -327,23 +328,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocal<ObjectPool<H2ConnectionWrapper>> connectionPool - = new ThreadLocal<ObjectPool<H2ConnectionWrapper>>() { - @Override protected ObjectPool<H2ConnectionWrapper> initialValue() { - return new ObjectPool<>( - IgniteH2Indexing.this::newConnectionWrapper, - 50, - IgniteH2Indexing.this::closePooledConnectionWrapper, - IgniteH2Indexing.this::recycleConnection); - } - }; + private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5); /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>> connCache - = new ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>>() { - @Override public ObjectPoolReusable<H2ConnectionWrapper> get() { - ObjectPoolReusable<H2ConnectionWrapper> reusable = super.get(); + private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() { + @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() { + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get(); boolean reconnect = true; @@ -363,21 +354,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { return reusable; } - @Override protected ObjectPoolReusable<H2ConnectionWrapper> initialValue() { - ObjectPool<H2ConnectionWrapper> pool = connectionPool.get(); - - ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = pool.borrow(); - - ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(Thread.currentThread()); + @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() { + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow(); - ConcurrentHashMap<H2ConnectionWrapper, Boolean> newMap = new ConcurrentHashMap<>(); - - perThreadConns = conns.putIfAbsent(Thread.currentThread(), newMap); - - if (perThreadConns == null) - perThreadConns = newMap; - - perThreadConns.put(reusableConnection.object(), false); + conns.put(Thread.currentThread(), reusableConnection.object()); return reusableConnection; } @@ -457,54 +437,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { return sysConn; } - /** - * @return Connection wrapper. - */ + /** */ private H2ConnectionWrapper newConnectionWrapper() { try { - Connection c = DriverManager.getConnection(dbUrl); - return new H2ConnectionWrapper(c); + return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl)); } catch (SQLException e) { throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e); } } /** - * @param conn Connection wrapper to close. - */ - private void closePooledConnectionWrapper(H2ConnectionWrapper conn) { - conns.get(conn.initialThread()).remove(conn); - - U.closeQuiet(conn); - } - - /** - * Removes from threadlocal cache and returns associated with current thread connection. - * @return Connection associated with current thread. - */ - public ObjectPoolReusable<H2ConnectionWrapper> detachConnection() { - ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = connCache.get(); - - connCache.remove(); - - conns.get(Thread.currentThread()).remove(reusableConnection.object()); - - return reusableConnection; - } - - /** - * Return connection to the glob all connection collection. - * @param conn Recycled connection. - */ - private void recycleConnection(H2ConnectionWrapper conn) { - ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(conn.initialThread()); - - // Mau be null when node is stopping. - if (perThreadConns != null) - perThreadConns.put(conn, false); - } - - /** * @param c Connection. * @param sql SQL. * @return <b>Cached</b> prepared statement. @@ -796,12 +738,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Handles SQL exception. */ private void onSqlException() { - H2ConnectionWrapper conn = connCache.get().object(); + Connection conn = connCache.get().object().connection(); connCache.set(null); if (conn != null) { - conns.get(Thread.currentThread()).remove(conn); + conns.remove(Thread.currentThread()); // Reset connection to receive new one at next call. U.close(conn, log); @@ -1448,15 +1390,31 @@ public class IgniteH2Indexing implements GridQueryIndexing { */ private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt, int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException { - if (cancel != null) - cancel.set(() -> cancelStatement(stmt)); + final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker(); + + if (cancel != null) { + cancel.set(new Runnable() { + @Override public void run() { + if (lazyWorker != null) { + lazyWorker.submit(new Runnable() { + @Override public void run() { + cancelStatement(stmt); + } + }); + } + else + cancelStatement(stmt); + } + }); + } Session ses = H2Utils.session(conn); if (timeoutMillis > 0) ses.setQueryTimeout(timeoutMillis); - else - ses.setQueryTimeout(0); + + if (lazyWorker != null) + ses.setLazyQueryExecution(true); try { return stmt.executeQuery(); @@ -1468,6 +1426,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e); } + finally { + if (timeoutMillis > 0) + ses.setQueryTimeout(0); + + if (lazyWorker != null) + ses.setLazyQueryExecution(false); + } } /** @@ -2578,11 +2543,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { topVer, mvccSnapshot); } - /** - * @param flags Flags holder. - * @param flag Flag mask to check. - * @return {@code true} if flag is set, otherwise returns {@code false}. - */ private boolean isFlagSet(int flags, int flag) { return (flags & flag) == flag; } @@ -3060,24 +3020,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { private void cleanupStatementCache() { long now = U.currentTimeMillis(); - for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it - = conns.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next(); + for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Thread, H2ConnectionWrapper> entry = it.next(); Thread t = entry.getKey(); if (t.getState() == Thread.State.TERMINATED) { - for (H2ConnectionWrapper c : entry.getValue().keySet()) - U.close(c, log); + U.close(entry.getValue(), log); it.remove(); } - else { - for (H2ConnectionWrapper c : entry.getValue().keySet()) { - if (now - c.statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT) - c.clearStatementCache(); - } - } + else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT) + entry.getValue().clearStatementCache(); } } @@ -3085,15 +3039,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}. */ private void cleanupConnections() { - for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it - = conns.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next(); + for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Thread, H2ConnectionWrapper> entry = it.next(); Thread t = entry.getKey(); if (t.getState() == Thread.State.TERMINATED) { - for (H2ConnectionWrapper c : entry.getValue().keySet()) - U.close(c, log); + U.close(entry.getValue(), log); it.remove(); } @@ -3101,6 +3053,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Removes from cache and returns associated with current thread connection. + * @return Connection associated with current thread. + */ + public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() { + Thread key = Thread.currentThread(); + + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get(); + + H2ConnectionWrapper connection = conns.remove(key); + + connCache.remove(); + + assert reusableConnection.object().connection() == connection.connection(); + + return reusableConnection; + } + + /** * Rebuild indexes from hash index. * * @param cacheName Cache name. @@ -3464,15 +3434,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Stopping cache query index..."); - mapQryExec.stop(); - - for (ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) { - for (H2ConnectionWrapper c : perThreadConns.keySet()) - U.close(c, log); - } + mapQryExec.cancelLazyWorkers(); - connectionPool.remove(); - connCache.remove(); + for (H2ConnectionWrapper c : conns.values()) + U.close(c, log); conns.clear(); schemas.clear(); @@ -3581,7 +3546,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - conns.values().forEach(map -> map.keySet().forEach(H2ConnectionWrapper::clearStatementCache)); + conns.values().forEach(H2ConnectionWrapper::clearStatementCache); for (H2TableDescriptor tbl : rmvTbls) { for (Index idx : tbl.table().getIndexes()) @@ -3739,10 +3704,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public void cancelAllQueries() { - for (ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) { - for (H2ConnectionWrapper c : perThreadConns.keySet()) - U.close(c, log); - } + mapQryExec.cancelLazyWorkers(); + + for (H2ConnectionWrapper c : conns.values()) + U.close(c, log); } /** @@ -3792,7 +3757,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * @param twoStepQry Query. * @return {@code True} is system views exist. */ private boolean hasSystemViews(GridCacheTwoStepQuery twoStepQry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java deleted file mode 100644 index 9d2a580..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2; - -import org.apache.ignite.internal.util.typedef.internal.U; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** - * Thread-safe pool for managing limited number objects for further reuse. - * - * @param <E> Pooled objects type. - */ -public final class ObjectPool<E extends AutoCloseable> { - /** */ - private final Supplier<E> objectFactory; - - /** */ - private final ConcurrentLinkedQueue<E> bag = new ConcurrentLinkedQueue<>(); - - /** */ - private final int poolSize; - - /** The function to close object. */ - private final Consumer<E> closer; - - /** The listener is called when object is returned to the pool. */ - private final Consumer<E> recycler; - - /** - * @param objectFactory Factory used for new objects creation. - * @param poolSize Number of objects which pool can contain. - * @param closer Function to close object. - * @param recycler The listener is called when object is returned to the pool. - */ - public ObjectPool(Supplier<E> objectFactory, int poolSize, Consumer<E> closer, Consumer<E> recycler) { - this.objectFactory = objectFactory; - this.poolSize = poolSize; - this.closer = closer != null ? closer : U::closeQuiet; - this.recycler = recycler; - } - - /** - * Picks an object from the pool if one is present or creates new one otherwise. - * Returns an object wrapper which could be returned to the pool. - * - * @return Reusable object wrapper. - */ - public ObjectPoolReusable<E> borrow() { - E pooled = bag.poll(); - - return new ObjectPoolReusable<>(this, pooled != null ? pooled : objectFactory.get()); - } - - /** - * Recycles an object. - * - * @param object Object. - */ - void recycle(E object) { - assert object != null : "Already recycled"; - - if (bag.size() < poolSize) { - bag.add(object); - - if (recycler != null) - recycler.accept(object); - } - else - closer.accept(object); - } - - /** - * Visible for test - * @return Pool bag size. - */ - int bagSize() { - return bag.size(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java deleted file mode 100644 index 48fee42..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2; - -/** - * Wrapper for a pooled object with capability to return the object to a pool. - * - * @param <T> Enclosed object type. - */ -public class ObjectPoolReusable<T extends AutoCloseable> { - /** Object pool to recycle. */ - private final ObjectPool<T> pool; - - /** Detached object. */ - private T object; - - /** - * @param pool Object pool. - * @param object Detached object. - */ - ObjectPoolReusable(ObjectPool<T> pool, T object) { - this.pool = pool; - this.object = object; - } - - /** - * @return Enclosed object. - */ - public T object() { - return object; - } - - /** - * Returns an object to a pool or closes it if the pool is already full. - */ - public void recycle() { - assert object != null : "Already recycled"; - - pool.recycle(object); - - object = null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java new file mode 100644 index 0000000..25daa23 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.function.Supplier; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Special pool for managing limited number objects for further reuse. + * This pool maintains separate object bag for each thread by means of {@link ThreadLocal}. + * <p> + * If object is borrowed on one thread and recycled on different then it will be returned to + * recycling thread bag. For thread-safe use either pooled objects should be thread-safe or + * <i>happens-before</i> should be established between borrowing object and subsequent recycling. + * + * @param <E> pooled objects type + */ +public final class ThreadLocalObjectPool<E extends AutoCloseable> { + /** + * Wrapper for a pooled object with capability to return the object to a pool. + * + * @param <T> enclosed object type + */ + public static class Reusable<T extends AutoCloseable> { + /** */ + private final ThreadLocalObjectPool<T> pool; + /** */ + private final T object; + + /** */ + private Reusable(ThreadLocalObjectPool<T> pool, T object) { + this.pool = pool; + this.object = object; + } + + /** + * @return enclosed object + */ + public T object() { + return object; + } + + /** + * Returns an object to a pool or closes it if the pool is already full. + */ + public void recycle() { + Queue<Reusable<T>> bag = pool.bag.get(); + if (bag.size() < pool.poolSize) + bag.add(this); + else + U.closeQuiet(object); + } + } + + /** */ + private final Supplier<E> objectFactory; + /** */ + private final ThreadLocal<Queue<Reusable<E>>> bag = ThreadLocal.withInitial(LinkedList::new); + /** */ + private final int poolSize; + + /** + * @param objectFactory factory used for new objects creation + * @param poolSize number of objects which pool can contain + */ + public ThreadLocalObjectPool(Supplier<E> objectFactory, int poolSize) { + this.objectFactory = objectFactory; + this.poolSize = poolSize; + } + + /** + * Picks an object from the pool if one is present or creates new one otherwise. + * Returns an object wrapper which could be returned to the pool. + * + * @return reusable object wrapper + */ + public Reusable<E> borrow() { + Reusable<E> pooled = bag.get().poll(); + return pooled != null ? pooled : new Reusable<>(this, objectFactory.get()); + } + + /** Visible for test */ + int bagSize() { + return bag.get().size(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 31a444e..ba4b12b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; +import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool; import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; @@ -623,7 +623,7 @@ public final class UpdatePlan { private final EnlistOperation op; /** */ - private volatile ObjectPoolReusable<H2ConnectionWrapper> conn; + private volatile ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn; /** * @param idx Indexing. @@ -647,7 +647,7 @@ public final class UpdatePlan { /** {@inheritDoc} */ @Override public void beforeDetach() { - ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn = idx.detachConnection(); + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach(); if (isClosed()) conn0.recycle(); @@ -657,7 +657,7 @@ public final class UpdatePlan { @Override protected void onClose() { cur.close(); - ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn; + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn; if (conn0 != null) conn0.recycle(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index 9971b78..f12c0f3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -18,10 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -91,7 +89,7 @@ public class GridH2QueryContext { private MvccSnapshot mvccSnapshot; /** */ - private Set<GridH2Table> lockedTables = new HashSet<>(); + private MapQueryLazyWorker lazyWorker; /** * @param locNodeId Local node ID. @@ -353,8 +351,7 @@ public class GridH2QueryContext { assert qctx.get() == null; // We need MAP query context to be available to other threads to run distributed joins. - if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null - && MapQueryLazyWorker.currentWorker() == null) + if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null) throw new IllegalStateException("Query context is already set."); qctx.set(x); @@ -404,7 +401,10 @@ public class GridH2QueryContext { assert x.key.equals(key); - x.clearContext(nodeStop); + if (x.lazyWorker() != null) + x.lazyWorker().stop(nodeStop); + else + x.clearContext(nodeStop); return true; } @@ -413,10 +413,7 @@ public class GridH2QueryContext { * @param nodeStop Node is stopping. */ @SuppressWarnings("ForLoopReplaceableByForEach") - public synchronized void clearContext(boolean nodeStop) { - if (cleared) - return; - + public void clearContext(boolean nodeStop) { cleared = true; List<GridReservable> r = reservations; @@ -519,10 +516,20 @@ public class GridH2QueryContext { } /** - * @return The set of tables have been locked by current thread. + * @return Lazy worker, if any, or {@code null} if none. */ - public Set<GridH2Table> lockedTables() { - return lockedTables; + public MapQueryLazyWorker lazyWorker() { + return lazyWorker; + } + + /** + * @param lazyWorker Lazy worker, if any, or {@code null} if none. + * @return {@code this}. + */ + public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) { + this.lazyWorker = lazyWorker; + + return this; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 709ded7..a612b63 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -17,6 +17,17 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -26,7 +37,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; -import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; +import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.util.typedef.F; import org.h2.command.ddl.CreateTableData; import org.h2.command.dml.Insert; @@ -47,19 +58,6 @@ import org.h2.table.TableType; import org.h2.value.DataType; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; @@ -92,12 +90,6 @@ public class GridH2Table extends TableBase { /** */ private final ReadWriteLock lock; - /** Number of reading threads which currently move execution from query pool to dedicated thread. */ - private final AtomicInteger lazyTransferCnt = new AtomicInteger(); - - /** Has writer that waits lock in the loop. */ - private volatile boolean hasWaitedWriter; - /** */ private boolean destroyed; @@ -273,11 +265,6 @@ public class GridH2Table extends TableBase { ses.addLock(this); - GridH2QueryContext qctx = GridH2QueryContext.get(); - - if (qctx != null) - qctx.lockedTables().add(this); - return false; } @@ -304,44 +291,15 @@ public class GridH2Table extends TableBase { Lock l = exclusive ? lock.writeLock() : lock.readLock(); try { - if (exclusive) { - // Attempting to obtain exclusive lock for DDL. - // Lock is considered acquired only if "lazyTransferCnt" is zero, meaning that - // currently there are no reader threads moving query execution from query - // pool to dedicated thread. - // It is possible that reader which is currently transferring execution gets - // queued after the write lock we are trying to acquire. So we use timed waiting - // and a loop to avoid deadlocks. - for (;;) { - if (l.tryLock(200, TimeUnit.MILLISECONDS)) { - if (lazyTransferCnt.get() == 0) - break; - else - l.unlock(); - } - - hasWaitedWriter = true; - - Thread.yield(); - } - - hasWaitedWriter = false; - } + if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY) + l.lockInterruptibly(); else { - // Attempt to acquire read lock (query execution, DML, cache update). - // If query is being executed inside a query pool, we do not want it to be blocked - // for a long time, as it would prevent other queries from being executed. So we - // wait a little and then force transfer to dedicated thread by throwing special - // timeout exception.GridNioSslSelfTest - // If query is not in the query pool, then we simply wait for lock acquisition. - if (isSqlNotInLazy()) { - if (hasWaitedWriter || !l.tryLock(200, TimeUnit.MILLISECONDS)) { - throw new GridH2RetryException("Long wait on Table lock: [tableName=" + getName() - + ", hasWaitedWriter=" + hasWaitedWriter + ']'); - } + for (;;) { + if (l.tryLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.yield(); } - else - l.lockInterruptibly(); } } catch (InterruptedException e) { @@ -363,49 +321,6 @@ public class GridH2Table extends TableBase { } /** - * Check if table is being locked in not lazy thread by SQL query. - * - * @return {@code True} if is in query pool. - */ - private static boolean isSqlNotInLazy() { - return GridH2QueryContext.get() != null && MapQueryLazyWorker.currentWorker() == null; - } - - /** - * Callback invoked when session is to be transferred to lazy thread. In order to prevent concurrent changes - * by DDL during move we increment counter before releasing read lock. - * - * @param ses Session. - */ - public void onLazyTransferStarted(Session ses) { - assert sessions.containsKey(ses) : "Detached session have not locked the table: " + getName(); - - lazyTransferCnt.incrementAndGet(); - - lock.readLock().unlock(); - } - - /** - * Callback invoked when lazy transfer finished. Acquire the lock, decrement transfer counter. - * - * @param ses Session to detach. - */ - public void onLazyTransferFinished(Session ses) { - assert sessions.containsKey(ses) : "Attached session have not locked the table: " + getName(); - - try { - lock.readLock().lockInterruptibly(); - - lazyTransferCnt.decrementAndGet(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e); - } - } - - /** * Check if table is not destroyed. */ private void ensureNotDestroyed() { @@ -495,11 +410,6 @@ public class GridH2Table extends TableBase { if (exclusive == null) return; - GridH2QueryContext qctx = GridH2QueryContext.get(); - - if (qctx != null) - qctx.lockedTables().remove(this); - unlock(exclusive); } @@ -1039,10 +949,9 @@ public class GridH2Table extends TableBase { } /** - * Drop columns. * - * @param cols Columns. - * @param ifExists IF EXISTS flag. + * @param cols + * @param ifExists */ public void dropColumns(List<String> cols, boolean ifExists) { assert !ifExists || cols.size() == 1;
