Repository: ignite Updated Branches: refs/heads/ignite-2.7 6a7e0d168 -> c8ba2ac14
IGNITE-10007: MVCC: fixed a bug preventing transaction commit on deactivated cluster. This closes #5201. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c8ba2ac1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c8ba2ac1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c8ba2ac1 Branch: refs/heads/ignite-2.7 Commit: c8ba2ac142a2513f603dc00895444c0512659fe7 Parents: 6a7e0d1 Author: ipavlukhin <vololo...@gmail.com> Authored: Wed Oct 31 11:22:38 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Oct 31 11:23:21 2018 +0300 ---------------------------------------------------------------------- .../suite/IgniteJdbcDriverMvccTestSuite.java | 2 + ...ctionFinishOnDeactivatedClusterSelfTest.java | 156 +++++++++++++++++++ .../processors/query/GridQueryProcessor.java | 22 +-- .../processors/query/h2/IgniteH2Indexing.java | 45 ++++-- 4 files changed, 198 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c8ba2ac1/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java index df8054f..66eafae 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinTransactionsClientNoAutoCommitComplex import org.apache.ignite.jdbc.thin.JdbcThinTransactionsServerAutoCommitComplexSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinTransactionsServerNoAutoCommitComplexSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinTransactionsWithMvccEnabledSelfTest; +import org.apache.ignite.jdbc.thin.MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest; /** */ public class IgniteJdbcDriverMvccTestSuite extends TestSuite { @@ -43,6 +44,7 @@ public class IgniteJdbcDriverMvccTestSuite extends TestSuite { suite.addTestSuite(JdbcThinTransactionsServerAutoCommitComplexSelfTest.class); suite.addTestSuite(JdbcThinTransactionsClientNoAutoCommitComplexSelfTest.class); suite.addTestSuite(JdbcThinTransactionsServerNoAutoCommitComplexSelfTest.class); + suite.addTestSuite(MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c8ba2ac1/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest.java new file mode 100644 index 0000000..8a4fbe3 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientClusterState; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientFactory; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** */ +public class MvccJdbcTransactionFinishOnDeactivatedClusterSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setConnectorConfiguration(new ConnectorConfiguration()) + .setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true)) + ); + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitAfterDeactivation() throws Exception { + checkTxFinishAfterDeactivation(true); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackAfterDeactivation() throws Exception { + checkTxFinishAfterDeactivation(false); + } + + /** */ + public void checkTxFinishAfterDeactivation(boolean commit) throws Exception { + IgniteEx node0 = startGrid(0); + + node0.cluster().active(true); + + try (Connection conn = connect()) { + execute(conn, "CREATE TABLE t1(a INT, b VARCHAR, PRIMARY KEY(a)) WITH \"atomicity=TRANSACTIONAL_SNAPSHOT,backups=1\""); + } + + final CountDownLatch enlistedLatch = new CountDownLatch(1); + + assert node0.cluster().active(); + + IgniteInternalFuture txFinishedFut = GridTestUtils.runAsync(() -> { + executeTransaction(commit, enlistedLatch, () -> !node0.context().state().publicApiActiveState(true)); + + return null; + }); + + enlistedLatch.await(); + + deactivateThroughClient(); + + log.info(">>> Cluster deactivated ..."); + + try { + txFinishedFut.get(); + } + catch (Exception e) { + e.printStackTrace(); + + fail("Exception is not expected here"); + } + } + + /** */ + private void executeTransaction(boolean commit, CountDownLatch enlistedLatch, + GridAbsPredicate beforeCommitCondition) throws Exception { + try (Connection conn = connect()) { + execute(conn, "BEGIN"); + + execute(conn, "INSERT INTO t1 VALUES (1, '1')"); + + log.info(">>> Started transaction and enlisted entries"); + + enlistedLatch.countDown(); + + GridTestUtils.waitForCondition(beforeCommitCondition, 5_000); + + log.info(">>> Attempting to finish transaction"); + + execute(conn, commit ? "COMMIT" : "ROLLBACK"); + } + } + + /** */ + private static Connection connect() throws Exception { + return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1"); + } + + /** */ + private static void execute(Connection conn, String sql) throws Exception { + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate(sql); + } + } + + /** */ + private void deactivateThroughClient() throws Exception { + GridClientConfiguration clientCfg = new GridClientConfiguration(); + + clientCfg.setServers(Collections.singletonList("127.0.0.1:11211")); + + try (GridClient client = GridClientFactory.start(clientCfg)) { + GridClientClusterState state = client.state(); + + log.info(">>> Try to deactivate ..."); + + state.active(false); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c8ba2ac1/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index b5b104d..accccbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -2115,12 +2115,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { validateSqlFieldsQuery(qry, ctx, cctx); - if (!ctx.state().publicApiActiveState(true)) { - throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " + - "the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " + - "join the cluster. To activate the cluster call Ignite.active(true)."); - } - if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); @@ -2526,15 +2520,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { for (QueryField col : cols) { try { props.add(new QueryBinaryProperty( - ctx, + ctx, col.name(), - null, - Class.forName(col.typeName()), - false, - null, - !col.isNullable(), - null, - col.precision(), + null, + Class.forName(col.typeName()), + false, + null, + !col.isNullable(), + null, + col.precision(), col.scale())); } catch (ClassNotFoundException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c8ba2ac1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 56a5e2f..5adb6f16 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1897,27 +1897,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Try executing query using native facilities. + * Determines if a passed query can be executed natively. * * @param schemaName Schema name. * @param qry Query. - * @param cliCtx Client context, or {@code null} if not applicable. - * @return Result or {@code null} if cannot parse/process this query. + * @return Command or {@code null} if cannot parse this query. */ - @SuppressWarnings({"ConstantConditions", "StatementWithEmptyBody"}) - private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry, - @Nullable SqlClientContext cliCtx) { + @Nullable private SqlCommand parseQueryNative(String schemaName, SqlFieldsQuery qry) { // Heuristic check for fast return. if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find()) return null; - // Parse. - SqlCommand cmd; - try { SqlParser parser = new SqlParser(schemaName, qry.getSql()); - cmd = parser.nextCommand(); + SqlCommand cmd = parser.nextCommand(); + + // TODO support transansaction commands in multistatements + // https://issues.apache.org/jira/browse/IGNITE-10063 // No support for multiple commands for now. if (parser.nextCommand() != null) @@ -1935,6 +1932,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { || cmd instanceof SqlAlterUserCommand || cmd instanceof SqlDropUserCommand)) return null; + + return cmd; } catch (SqlStrictParseException e) { throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.PARSING, e); @@ -1955,7 +1954,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(), code, e); } + } + /** + * Executes a query natively. + * + * @param qry Query. + * @param cmd Parsed command corresponding to query. + * @param cliCtx Client context, or {@code null} if not applicable. + * @return Result cursors. + */ + private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(SqlFieldsQuery qry, SqlCommand cmd, + @Nullable SqlClientContext cliCtx) { // Execute. try { if (cmd instanceof SqlCreateIndexCommand @@ -2118,10 +2128,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry); try { - List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry, cliCtx); + SqlCommand nativeCmd = parseQueryNative(schemaName, qry); + + if (!(nativeCmd instanceof SqlCommitTransactionCommand || nativeCmd instanceof SqlRollbackTransactionCommand) + && !ctx.state().publicApiActiveState(true)) { + throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " + + "the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " + + "join the cluster. To activate the cluster call Ignite.active(true)."); + } + + if (nativeCmd != null) + return queryDistributedSqlFieldsNative(qry, nativeCmd, cliCtx); - if (res != null) - return res; + List<FieldsQueryCursor<List<?>>> res; { // First, let's check if we already have a two-step query for this statement...