IGNITE-9320: MVCC configuration: added new CacheAtomicityMode.MVCC_SNAPSHOT. This closes #4660.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ab94934 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ab94934 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ab94934 Branch: refs/heads/master Commit: 2ab949349050a5affc66f01d4f2f00b81776143c Parents: f018251 Author: rkondakov <[email protected]> Authored: Mon Sep 17 13:08:19 2018 +0300 Committer: devozerov <[email protected]> Committed: Mon Sep 17 13:08:19 2018 +0300 ---------------------------------------------------------------------- .../JdbcThinConnectionMvccEnabledSelfTest.java | 4 +- ...ThinTransactionsAbstractComplexSelfTest.java | 10 +- .../jdbc/thin/JdbcThinTransactionsSelfTest.java | 6 +- ...ThinTransactionsWithMvccEnabledSelfTest.java | 6 +- .../apache/ignite/cache/CacheAtomicityMode.java | 33 +- .../configuration/IgniteConfiguration.java | 48 +-- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../ignite/internal/IgniteNodeAttributes.java | 3 - .../discovery/GridDiscoveryManager.java | 18 +- .../affinity/GridAffinityAssignmentCache.java | 5 +- .../processors/cache/CacheGroupContext.java | 21 +- .../processors/cache/ClusterCachesInfo.java | 20 +- .../processors/cache/GridCacheContext.java | 10 +- .../processors/cache/GridCacheProcessor.java | 70 +++-- .../mvcc/MvccPreviousCoordinatorQueries.java | 7 +- .../processors/cache/mvcc/MvccProcessor.java | 44 ++- .../cache/mvcc/MvccProcessorImpl.java | 309 ++++++++++++------- .../processors/cache/mvcc/MvccUtils.java | 27 +- .../cache/mvcc/NoOpMvccProcessor.java | 215 ------------- .../continuous/CacheContinuousQueryManager.java | 2 +- .../odbc/jdbc/JdbcRequestHandler.java | 9 +- .../odbc/odbc/OdbcRequestHandler.java | 11 +- .../mvcc/CacheMvccAbstractFeatureTest.java | 2 +- .../cache/mvcc/CacheMvccAbstractTest.java | 21 +- .../cache/mvcc/CacheMvccClusterRestartTest.java | 6 +- .../CacheMvccConfigurationValidationTest.java | 245 ++++++++++++++- .../mvcc/CacheMvccOperationChecksTest.java | 4 +- .../mvcc/CacheMvccProcessorLazyStartTest.java | 175 +++++++++++ .../cache/mvcc/CacheMvccProcessorTest.java | 4 + .../cache/mvcc/CacheMvccVacuumTest.java | 36 ++- .../MemoryPolicyInitializationTest.java | 8 +- .../DataStreamProcessorMvccSelfTest.java | 17 +- .../DataStreamProcessorSelfTest.java | 20 +- .../configvariations/ConfigVariations.java | 2 +- .../testsuites/IgniteCacheMvccTestSuite.java | 2 + .../processors/query/h2/IgniteH2Indexing.java | 102 ++++-- .../query/h2/PreparedStatementEx.java | 7 +- .../query/h2/dml/UpdatePlanBuilder.java | 3 +- .../query/h2/sql/GridSqlQueryParser.java | 2 + ...sactionsCommandsWithMvccEnabledSelfTest.java | 4 +- .../cache/index/SqlTransactionsSelfTest.java | 5 +- .../cache/mvcc/CacheMvccBulkLoadTest.java | 2 +- .../cache/mvcc/CacheMvccDmlSimpleTest.java | 2 +- ...cheMvccSelectForUpdateQueryAbstractTest.java | 19 +- .../cache/mvcc/CacheMvccSizeTest.java | 4 +- ...CacheMvccSqlConfigurationValidationTest.java | 111 +++++++ .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 4 +- .../mvcc/CacheMvccStreamingInsertTest.java | 2 +- .../query/h2/GridIndexRebuildSelfTest.java | 2 +- ...GridIndexRebuildWithMvccEnabledSelfTest.java | 5 +- .../testsuites/IgniteCacheMvccSqlTestSuite.java | 2 + .../cpp/odbc-test/config/queries-default.xml | 198 ++++++------ .../odbc-test/config/queries-transaction-32.xml | 11 +- .../odbc-test/config/queries-transaction.xml | 11 +- .../config/mvcc/benchmark-mvcc-messages.sh | 19 +- .../config/mvcc/benchmark-mvcc-processor.sh | 15 +- .../mvcc/benchmark-mvcc-updates-contention.sh | 15 +- .../mvcc/benchmark-thin-native.properties | 47 ++- ...benchmark-jdbc-thin-inmemory-mvcc.properties | 21 +- .../yardstick/IgniteBenchmarkArguments.java | 18 +- .../org/apache/ignite/yardstick/IgniteNode.java | 2 - .../mvcc/MvccUpdateContentionBenchmark.java | 5 +- 62 files changed, 1292 insertions(+), 770 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java index 051d1d2..0196cb2 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java @@ -23,6 +23,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Savepoint; import java.util.concurrent.Callable; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; @@ -67,8 +68,6 @@ public class JdbcThinConnectionMvccEnabledSelfTest extends JdbcThinAbstractSelfT cfg.setGridLogger(new GridStringLogger()); - cfg.setMvccEnabled(true); - return cfg; } @@ -81,6 +80,7 @@ public class JdbcThinConnectionMvccEnabledSelfTest extends JdbcThinAbstractSelfT CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(name); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java index 28c65a9..68ed36b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java @@ -116,15 +116,13 @@ public abstract class JdbcThinTransactionsAbstractComplexSelfTest extends JdbcTh @Override protected IgniteConfiguration getConfiguration(String testIgniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(testIgniteInstanceName); - cfg.setMvccEnabled(true); - CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<>("Person"); ccfg.setIndexedTypes(Integer.class, Person.class); ccfg.getQueryEntities().iterator().next().setKeyFieldName("id"); - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); ccfg.setCacheMode(CacheMode.PARTITIONED); @@ -145,14 +143,14 @@ public abstract class JdbcThinTransactionsAbstractComplexSelfTest extends JdbcTh execute("ALTER TABLE \"Person\".person add if not exists companyid int"); execute("CREATE TABLE City (id int primary key, name varchar, population int) WITH " + - "\"atomicity=transactional,template=partitioned,backups=3,cache_name=City\""); + "\"atomicity=transactional_snapshot,template=partitioned,backups=3,cache_name=City\""); execute("CREATE TABLE Company (id int, \"cityid\" int, name varchar, primary key (id, \"cityid\")) WITH " + - "\"atomicity=transactional,template=partitioned,backups=1,wrap_value=false,affinity_key=cityid," + + "\"atomicity=transactional_snapshot,template=partitioned,backups=1,wrap_value=false,affinity_key=cityid," + "cache_name=Company\""); execute("CREATE TABLE Product (id int primary key, name varchar, companyid int) WITH " + - "\"atomicity=transactional,template=partitioned,backups=2,cache_name=Product\""); + "\"atomicity=transactional_snapshot,template=partitioned,backups=2,cache_name=Product\""); execute("CREATE INDEX IF NOT EXISTS prodidx ON Product(companyid)"); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java index 1619996..a8fa47b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -58,8 +59,6 @@ public class JdbcThinTransactionsSelfTest extends JdbcThinAbstractSelfTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setMvccEnabled(true); - cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME)); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -84,6 +83,7 @@ public class JdbcThinTransactionsSelfTest extends JdbcThinAbstractSelfTest { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(name); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); return cfg; } @@ -97,7 +97,7 @@ public class JdbcThinTransactionsSelfTest extends JdbcThinAbstractSelfTest { try (Connection c = c(true, NestedTxMode.ERROR)) { try (Statement s = c.createStatement()) { s.execute("CREATE TABLE INTS (k int primary key, v int) WITH \"cache_name=ints,wrap_value=false," + - "atomicity=transactional\""); + "atomicity=transactional_snapshot\""); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java index e3f7f14..e01a53d 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -58,8 +59,6 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest extends JdbcThinAbstrac @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setMvccEnabled(true); - cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME)); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -84,6 +83,7 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest extends JdbcThinAbstrac CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(name); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); return cfg; } @@ -97,7 +97,7 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest extends JdbcThinAbstrac try (Connection c = c(true, NestedTxMode.ERROR)) { try (Statement s = c.createStatement()) { s.execute("CREATE TABLE INTS (k int primary key, v int) WITH \"cache_name=ints,wrap_value=false," + - "atomicity=transactional\""); + "atomicity=transactional_snapshot\""); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java index 79a8e5f..1584286 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java @@ -33,8 +33,12 @@ import org.jetbrains.annotations.Nullable; */ public enum CacheAtomicityMode { /** - * Specified fully {@code ACID}-compliant transactional cache behavior. See + * Specified fully {@code ACID}-compliant transactional cache behavior for key-value API. See * {@link Transaction} for more information about transactions. + * <p> + * <b>Note:</b> this mode guaranties transactional behavior <b>only for key-value API</b> operations. + * For ACID SQL transactions use {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode. + * </p> */ TRANSACTIONAL, @@ -88,7 +92,32 @@ public enum CacheAtomicityMode { * * @see IgniteCache#withNoRetries() */ - ATOMIC; + ATOMIC, + + /** + * Specified fully {@code ACID}-compliant transactional cache behavior not only for key-value API, + * but also for SQL transactions. + * <p> + * This cache atomicity mode is implemented within multiversion concurrency control (MVCC) where database can + * contain multiple versions of each row to allow readers do not collide with writers. + * Each update in this mode generates a new version of a row and don't remove a previous one. + * Old versions are cleaned only when they are not visible to anyone. + * </p> + * <p> + * There is one node in cluster is elected as MVCC coordinator. This node tracks all in-flight transactions and + * queries in the cluster. + * Each transaction or query over the cache with {@code TRANSACTIONAL_SNAPSHOT} mode obtains current + * database snapshot from the coordinator. This snapshot allows transactions and queries to skip invisible + * updates made by concurrent transactions to always observe the same consistent database state. + * </p> + * <p> + * <b>Note!</b> This atomicity mode is not interoperable with the other atomicity modes in the same transaction. + * Caches participated in transaction should either be all {@code TRANSACTIONAL} or all + * {@code TRANSACTIONAL_SNAPSHOT}, but not the mixed ones. + * </p> + * See {@link Transaction} for more information about transactions. + */ + TRANSACTIONAL_SNAPSHOT; /** Enumerated values. */ private static final CacheAtomicityMode[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 3060caa..6a0c7cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -218,8 +218,7 @@ public class IgniteConfiguration { public static final int DFLT_MVCC_VACUUM_THREAD_CNT = 2; /** Default time interval between vacuum process runs (ms). */ - public static final int DFLT_MVCC_VACUUM_TIME_INTERVAL = 5000; - + public static final int DFLT_MVCC_VACUUM_FREQUENCY = 5000; /** Optional local Ignite instance name. */ private String igniteInstanceName; @@ -494,14 +493,11 @@ public class IgniteConfiguration { /** Client connector configuration. */ private ClientConnectorConfiguration cliConnCfg = ClientListenerProcessor.DFLT_CLI_CFG; - /** Flag whether MVCC is enabled. */ - private boolean mvccEnabled; - /** Size of MVCC vacuum thread pool. */ private int mvccVacuumThreadCnt = DFLT_MVCC_VACUUM_THREAD_CNT; /** Time interval between vacuum process runs (ms). */ - private int mvccVacuumTimeInterval = DFLT_MVCC_VACUUM_TIME_INTERVAL; + private int mvccVacuumFreq = DFLT_MVCC_VACUUM_FREQUENCY; /** User authentication enabled. */ private boolean authEnabled; @@ -595,9 +591,8 @@ public class IgniteConfiguration { metricsLogFreq = cfg.getMetricsLogFrequency(); metricsUpdateFreq = cfg.getMetricsUpdateFrequency(); mgmtPoolSize = cfg.getManagementThreadPoolSize(); - mvccEnabled = cfg.isMvccEnabled(); - mvccVacuumThreadCnt = cfg.mvccVacuumThreadCnt; - mvccVacuumTimeInterval = cfg.mvccVacuumTimeInterval; + mvccVacuumThreadCnt = cfg.getMvccVacuumThreadCount(); + mvccVacuumFreq = cfg.getMvccVacuumFrequency(); netTimeout = cfg.getNetworkTimeout(); nodeId = cfg.getNodeId(); odbcCfg = cfg.getOdbcConfiguration(); @@ -3003,32 +2998,11 @@ public class IgniteConfiguration { } /** - * Whether or not MVCC is enabled. - * - * @return {@code True} if MVCC is enabled. - */ - public boolean isMvccEnabled() { - return mvccEnabled; - } - - /** - * Sets MVCC enabled flag. - * - * @param mvccEnabled MVCC enabled flag. - * @return {@code this} for chaining. - */ - public IgniteConfiguration setMvccEnabled(boolean mvccEnabled) { - this.mvccEnabled = mvccEnabled; - - return this; - } - - /** * Returns number of MVCC vacuum cleanup threads. * * @return Number of MVCC vacuum cleanup threads. */ - public int getMvccVacuumThreadCnt() { + public int getMvccVacuumThreadCount() { return mvccVacuumThreadCnt; } @@ -3038,7 +3012,7 @@ public class IgniteConfiguration { * @param mvccVacuumThreadCnt Number of MVCC vacuum cleanup threads. * @return {@code this} for chaining. */ - public IgniteConfiguration setMvccVacuumThreadCnt(int mvccVacuumThreadCnt) { + public IgniteConfiguration setMvccVacuumThreadCount(int mvccVacuumThreadCnt) { this.mvccVacuumThreadCnt = mvccVacuumThreadCnt; return this; @@ -3049,18 +3023,18 @@ public class IgniteConfiguration { * * @return Time interval between vacuum runs. */ - public int getMvccVacuumTimeInterval() { - return mvccVacuumTimeInterval; + public int getMvccVacuumFrequency() { + return mvccVacuumFreq; } /** * Sets time interval between vacuum runs. * - * @param mvccVacuumTimeInterval Time interval between vacuum runs. + * @param mvccVacuumFreq Time interval between vacuum runs. * @return {@code this} for chaining. */ - public IgniteConfiguration setMvccVacuumTimeInterval(int mvccVacuumTimeInterval) { - this.mvccVacuumTimeInterval = mvccVacuumTimeInterval; + public IgniteConfiguration setMvccVacuumFrequency(int mvccVacuumFreq) { + this.mvccVacuumFreq = mvccVacuumFreq; return this; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 230c05c..d74b3aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -125,7 +125,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; +import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessorImpl; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -994,7 +994,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // be able to start receiving messages once discovery completes. try { startProcessor(new PdsConsistentIdProcessor(ctx)); - startProcessor(MvccUtils.createProcessor(ctx)); + startProcessor(new MvccProcessorImpl(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 539727d..5b764e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -202,9 +202,6 @@ public final class IgniteNodeAttributes { /** Internal attribute name constant. */ public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported"; - /** Mvcc enabled flag. */ - public static final String ATTR_MVCC_ENABLED = ATTR_PREFIX + ".mvcc.enabled"; - /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 48e3318..b9af961 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -162,7 +162,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MVCC_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_OFFHEAP_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM; @@ -658,7 +657,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { updateClientNodes(node.id()); } - ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer); + ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer, customMsg); boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id()); @@ -1222,8 +1221,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE); - Boolean locMvccEnabled = locNode.attribute(ATTR_MVCC_ENABLED); - for (ClusterNode n : nodes) { int rmtJvmMajVer = nodeJavaMajorVersion(n); @@ -1321,17 +1318,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]"); } - Boolean rmtMvccEnabled = n.attribute(ATTR_MVCC_ENABLED); - - if (!F.eq(locMvccEnabled, rmtMvccEnabled)) { - throw new IgniteCheckedException("Remote node has MVCC mode different from local " + - "[locId8=" + U.id8(locNode.id()) + - ", locMvccMode=" + (Boolean.TRUE.equals(locMvccEnabled) ? "ENABLED" : "DISABLED") + - ", rmtId8=" + U.id8(n.id()) + - ", rmtMvccMode=" + (Boolean.TRUE.equals(rmtMvccEnabled) ? "ENABLED" : "DISABLED") + - ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]"); - } - if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0 && ctx.security().enabled() // Matters only if security enabled. ) { @@ -2388,7 +2374,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collection<ClusterNode> topSnapshot) { assert topSnapshot.contains(loc); - MvccCoordinator mvccCrd = ctx.coordinators().coordinatorFromDiscoveryEvent(); + MvccCoordinator mvccCrd = ctx.coordinators().assignedCoordinator(); HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index cc2c17c..2290ce6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -203,7 +203,10 @@ public class GridAffinityAssignmentCache { * @param affAssignment Affinity assignment for topology version. */ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { - MvccCoordinator mvccCrd = ctx.cache().context().coordinators().currentCoordinator(topVer); + MvccCoordinator mvccCrd = null; + + if (!locCache) + mvccCrd = ctx.cache().context().coordinators().currentCoordinator(topVer); initialize(topVer, affAssignment, mvccCrd); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index c100d16..b92280a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -30,7 +30,6 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataPageEvictionMode; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TopologyValidator; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -42,7 +41,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; -import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; @@ -61,7 +59,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; @@ -218,26 +216,13 @@ public class CacheGroupContext { storeCacheId = affNode && dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED; + mvccEnabled = ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; + log = ctx.kernalContext().log(getClass()); caches = new ArrayList<>(); mxBean = new CacheGroupMetricsMXBeanImpl(this); - - mvccEnabled = mvccEnabled(ctx.gridConfig(), ccfg, cacheType); - } - - /** - * @param cfg Ignite configuration. - * @param ccfg Cache configuration. - * @param cacheType Cache typr. - * @return {@code True} if mvcc is enabled for given cache. - */ - public static boolean mvccEnabled(IgniteConfiguration cfg, CacheConfiguration ccfg, CacheType cacheType) { - return cfg.isMvccEnabled() && - cacheType == CacheType.USER && - ccfg.getCacheMode() != LOCAL && - ccfg.getAtomicityMode() == TRANSACTIONAL; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 8eab9c0..572e33e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -69,6 +69,7 @@ import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -147,7 +148,7 @@ class ClusterCachesInfo { if (ccfg == null) grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config()); else - validateCacheGroupConfiguration(ccfg, info.cacheData().config(), info.cacheType()); + validateCacheGroupConfiguration(ccfg, info.cacheData().config()); } String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true); @@ -220,7 +221,7 @@ class ClusterCachesInfo { } if (checkConsistency) - validateStartCacheConfiguration(locCfg, cacheData.cacheType()); + validateStartCacheConfiguration(locCfg); } } @@ -1864,17 +1865,16 @@ class ClusterCachesInfo { /** * @param ccfg Cache configuration to start. - * @param cacheType Cache type. * @throws IgniteCheckedException If failed. */ - void validateStartCacheConfiguration(CacheConfiguration ccfg, CacheType cacheType) throws IgniteCheckedException { + void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { if (ccfg.getGroupName() != null) { CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName()); if (grpDesc != null) { assert ccfg.getGroupName().equals(grpDesc.groupName()); - validateCacheGroupConfiguration(grpDesc.config(), ccfg, cacheType); + validateCacheGroupConfiguration(grpDesc.config(), ccfg); } } } @@ -1882,10 +1882,9 @@ class ClusterCachesInfo { /** * @param cfg Existing configuration. * @param startCfg Cache configuration to start. - * @param cacheType Cache type. * @throws IgniteCheckedException If validation failed. */ - private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg, CacheType cacheType) + private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg) throws IgniteCheckedException { GridCacheAttributes attr1 = new GridCacheAttributes(cfg); GridCacheAttributes attr2 = new GridCacheAttributes(startCfg); @@ -1893,10 +1892,9 @@ class ClusterCachesInfo { CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode", cfg.getCacheMode(), startCfg.getCacheMode(), true); - CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "mvccEnabled", "MVCC mode", - CacheGroupContext.mvccEnabled(ctx.config(), cfg, cacheType), - CacheGroupContext.mvccEnabled(ctx.config(), startCfg, cacheType), - true); + if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT || startCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "atomicityMode", "Atomicity mode", + attr1.atomicityMode(), attr2.atomicityMode(), true); CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function", attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index ac9de7c..7d200eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -112,6 +112,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; @@ -817,7 +818,14 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if transactional. */ public boolean transactional() { - return cacheCfg.getAtomicityMode() == TRANSACTIONAL; + return cacheCfg.getAtomicityMode() == TRANSACTIONAL || cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; + } + + /** + * @return {@code True} if transactional snapshot. + */ + public boolean transactionalSnapshot() { + return cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e0a2420..f000da7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; -import javax.cache.expiry.EternalExpiryPolicy; import javax.management.MBeanServer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -165,7 +164,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTR import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -278,6 +277,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { throws IgniteCheckedException { CU.initializeConfigDefaults(log, cfg, cacheObjCtx); + ctx.coordinators().preProcessCacheConfiguration(cfg); ctx.igfsHelper().preProcessCacheConfiguration(cfg); } @@ -525,6 +525,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } ctx.igfsHelper().validateCacheConfiguration(cc); + ctx.coordinators().validateCacheConfiguration(cc); if (cc.getAtomicityMode() == ATOMIC) assertParameter(cc.getTransactionManagerLookupClassName() == null, @@ -543,24 +544,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { "other cache types [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() + ", cacheType=" + cacheType + "]"); - if (cc.getAtomicityMode() == TRANSACTIONAL && c.isMvccEnabled()) { - if (cc.getCacheStoreFactory() != null) { - throw new IgniteCheckedException("Transactional cache may not have a third party cache store when " + - "MVCC is enabled."); - } - - if (cc.getExpiryPolicyFactory() != null && !(cc.getExpiryPolicyFactory().create() instanceof - EternalExpiryPolicy)) { - throw new IgniteCheckedException("Transactional cache may not have expiry policy when " + - "MVCC is enabled."); - } - - if (cc.getInterceptor() != null) { - throw new IgniteCheckedException("Transactional cache may not have an interceptor when " + - "MVCC is enabled."); - } - } - // Make sure we do not use sql schema for system views. if (ctx.query().moduleEnabled()) { String schema = QueryUtils.normalizeSchemaName(cc.getName(), cc.getSqlSchema()); @@ -827,14 +810,42 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (CU.isPersistenceEnabled(ctx.config()) && ctx.cache().context().pageStore() != null) { Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations(); - if (!F.isEmpty(storedCaches)) + if (!F.isEmpty(storedCaches)) { for (StoredCacheData storedCacheData : storedCaches.values()) { String cacheName = storedCacheData.config().getName(); //Ignore stored caches if it already added by static config(static config has higher priority). if (!caches.containsKey(cacheName)) addStoredCache(caches, storedCacheData, cacheName, cacheType(cacheName), false); + else { + CacheConfiguration cfg = caches.get(cacheName).cacheData().config(); + CacheConfiguration cfgFromStore = storedCacheData.config(); + + validateCacheConfigurationOnRestore(cfg, cfgFromStore); + } } + } + } + } + + /** + * Validates cache configuration against stored cache configuration when persistence is enabled. + * + * @param cfg Configured cache configuration. + * @param cfgFromStore Stored cache configuration + * @throws IgniteCheckedException If validation failed. + */ + private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, CacheConfiguration cfgFromStore) + throws IgniteCheckedException { + assert cfg != null && cfgFromStore != null; + + if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT || + cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) + && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) { + throw new IgniteCheckedException("Cannot start cache. Statically configured atomicity mode differs from " + + "previously stored configuration. Please check your configuration: [cacheName=" + cfg.getName() + + ", configuredAtomicityMode=" + cfg.getAtomicityMode() + + ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + "]"); } } @@ -1462,6 +1473,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { pluginMgr.validate(); + if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) + sharedCtx.coordinators().ensureStarted(); + sharedCtx.jta().registerCache(cfg); // Skip suggestions for internal caches. @@ -1532,7 +1546,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { switch (cfg.getCacheMode()) { case LOCAL: { switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { + case TRANSACTIONAL: + case TRANSACTIONAL_SNAPSHOT: { cache = new GridLocalCache(cacheCtx); break; @@ -1554,7 +1569,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { case REPLICATED: { if (nearEnabled) { switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { + case TRANSACTIONAL: + case TRANSACTIONAL_SNAPSHOT: { cache = new GridNearTransactionalCache(cacheCtx); break; @@ -1572,7 +1588,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } else { switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { + case TRANSACTIONAL: + case TRANSACTIONAL_SNAPSHOT: { cache = cacheCtx.affinityNode() ? new GridDhtColocatedCache(cacheCtx) : new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap()); @@ -1662,7 +1679,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridDhtCacheAdapter dht = null; switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { + case TRANSACTIONAL: + case TRANSACTIONAL_SNAPSHOT: { assert cache instanceof GridNearTransactionalCache; GridNearTransactionalCache near = (GridNearTransactionalCache)cache; @@ -3428,7 +3446,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration ccfg = req.startCacheConfiguration(); try { - cachesInfo.validateStartCacheConfiguration(ccfg, req.cacheType()); + cachesInfo.validateStartCacheConfiguration(ccfg); } catch (IgniteCheckedException e) { fut.onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java index 024ef70..cd7560f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.F; @@ -56,17 +55,17 @@ class MvccPreviousCoordinatorQueries { /** * @param nodeQueries Active queries map. - * @param discoCache Discovery data. + * @param nodes Cluster nodes. * @param mgr Discovery manager. */ - void init(Map<UUID, GridLongList> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { + void init(Map<UUID, GridLongList> nodeQueries, Collection<ClusterNode> nodes, GridDiscoveryManager mgr) { synchronized (this) { assert !initDone; assert waitNodes == null; waitNodes = new HashSet<>(); - for (ClusterNode node : discoCache.allNodes()) { + for (ClusterNode node : nodes) { if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) && mgr.alive(node) && !F.contains(rcvd, node.id())) http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java index bce61a1..a09468f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java @@ -23,10 +23,12 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.GridProcessor; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.ExchangeContext; @@ -43,8 +45,10 @@ public interface MvccProcessor extends GridProcessor { * @param evtType Event type. * @param nodes Current nodes. * @param topVer Topology version. + * @param customMsg Message */ - void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer); + void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer, + @Nullable DiscoveryCustomMessage customMsg); /** * Exchange start callback. @@ -71,6 +75,11 @@ public interface MvccProcessor extends GridProcessor { void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries); /** + * @return Mvcc coordinator received from discovery event. + */ + @Nullable MvccCoordinator assignedCoordinator(); + + /** * @return Coordinator. */ @Nullable MvccCoordinator currentCoordinator(); @@ -83,11 +92,6 @@ public interface MvccProcessor extends GridProcessor { @Nullable MvccCoordinator currentCoordinator(AffinityTopologyVersion topVer); /** - * @return Mvcc coordinator received from discovery event. - */ - @Nullable MvccCoordinator coordinatorFromDiscoveryEvent(); - - /** * @return Current coordinator node ID. */ UUID currentCoordinatorId(); @@ -247,4 +251,32 @@ public interface MvccProcessor extends GridProcessor { * @param diagCtx Diagnostic request. */ void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx); + + /** + * @return {@code True} if at least one cache with + * {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered. + */ + boolean mvccEnabled(); + + /** + * Pre-processes cache configuration before start. + * + * @param ccfg Cache configuration to pre-process. + */ + void preProcessCacheConfiguration(CacheConfiguration ccfg); + + /** + * Validates cache configuration before start. + * + * @param ccfg Cache configuration to validate. + * @throws IgniteCheckedException If validation failed. + */ + void validateCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException; + + /** + * Starts MVCC processor (i.e. initialises data structures and vacuum) if it has not been started yet. + * + * @throws IgniteCheckedException If failed to initialize. + */ + void ensureStarted() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 220f0c0..d1e2dc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -28,9 +28,13 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.expiry.EternalExpiryPolicy; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -40,15 +44,17 @@ import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -92,13 +98,14 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteNodeValidationResult; -import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -127,7 +134,10 @@ import static org.apache.ignite.internal.processors.cache.persistence.CacheDataR * MVCC processor. */ @SuppressWarnings("serial") -class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, DatabaseLifecycleListener { +public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, DatabaseLifecycleListener { + /** */ + private static final IgniteProductVersion MVCC_SUPPORTED_SINCE = IgniteProductVersion.fromString("2.7.0"); + /** */ private static final Waiter LOCAL_TRANSACTION_MARKER = new LocalTransactionMarker(); @@ -153,6 +163,9 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D private volatile MvccCoordinator curCrd; /** */ + private volatile MvccCoordinator assignedCrd; + + /** */ private TxLog txLog; /** */ @@ -170,9 +183,6 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D private volatile Throwable vacuumError; /** */ - private MvccDiscoveryData discoData = new MvccDiscoveryData(null); - - /** */ private final GridAtomicLong futIdCntr = new GridAtomicLong(0); /** */ @@ -208,6 +218,12 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** */ private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>(); + /** Flag whether at least one cache with {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered. */ + private volatile boolean mvccEnabled; + + /** Flag whether all nodes in cluster support MVCC. */ + private volatile boolean mvccSupported = true; + /** * @param ctx Context. */ @@ -219,8 +235,6 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_MVCC_ENABLED, Boolean.TRUE); - ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); @@ -228,12 +242,57 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } /** {@inheritDoc} */ - @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { - Boolean rmtEnabled = node.attribute(IgniteNodeAttributes.ATTR_MVCC_ENABLED); + @Override public boolean mvccEnabled() { + return mvccEnabled; + } + + /** {@inheritDoc} */ + @Override public void preProcessCacheConfiguration(CacheConfiguration ccfg) { + if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) { + if (!mvccSupported) + throw new IgniteException("Cannot start MVCC transactional cache. " + + "MVCC is unsupported by the cluster."); + + mvccEnabled = true; + } + } + + /** {@inheritDoc} */ + @Override public void validateCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { + if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) { + if (!mvccSupported) + throw new IgniteException("Cannot start MVCC transactional cache. " + + "MVCC is unsupported by the cluster."); + + if (ccfg.getCacheStoreFactory() != null) { + throw new IgniteCheckedException("Transactional cache may not have a third party cache store when " + + "MVCC is enabled."); + } + + if (ccfg.getExpiryPolicyFactory() != null && !(ccfg.getExpiryPolicyFactory().create() instanceof + EternalExpiryPolicy)) { + throw new IgniteCheckedException("Transactional cache may not have expiry policy when " + + "MVCC is enabled."); + } + + if (ccfg.getInterceptor() != null) { + throw new IgniteCheckedException("Transactional cache may not have an interceptor when " + + "MVCC is enabled."); + } + + if (ccfg.getCacheMode() == CacheMode.LOCAL) + throw new IgniteCheckedException("Local caches are not supported by MVCC engine. Use " + + "CacheAtomicityMode.TRANSACTIONAL for local caches."); + + mvccEnabled = true; + } + } - if (rmtEnabled == null) { - String errMsg = "Failed to add node to topology because MVCC is enabled on cluster and " + - "the node doesn't support MVCC or MVCC is disabled for the node [nodeId=" + node.id() + ']'; + /** {@inheritDoc} */ + @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { + if (mvccEnabled && node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) < 0) { + String errMsg = "Failed to add node to topology. MVCC is enabled on the cluster, but " + + "the node doesn't support MVCC [nodeId=" + node.id() + ']'; return new IgniteNodeValidationResult(node.id(), errMsg, errMsg); } @@ -242,6 +301,19 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } /** {@inheritDoc} */ + @Override public void ensureStarted() throws IgniteCheckedException { + if (!ctx.clientNode() && txLog == null) { + assert mvccEnabled && mvccSupported; + + txLog = new TxLog(ctx, ctx.cache().context().database()); + + startVacuumWorkers(); + + log.info("Mvcc processor started."); + } + } + + /** {@inheritDoc} */ @Override public void beforeStop(IgniteCacheDatabaseSharedManager mgr) { stopVacuumWorkers(); @@ -250,6 +322,7 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { + // We have to always init txLog data region. DataStorageConfiguration dscfg = dataStorageConfiguration(); mgr.addDataRegion( @@ -260,13 +333,7 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { - if (!CU.isPersistenceEnabled(ctx.config())) { - assert txLog == null; - - txLog = new TxLog(ctx, mgr); - - startVacuumWorkers(); - } + // No-op. } /** {@inheritDoc} */ @@ -281,87 +348,19 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public void afterMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { - assert CU.isPersistenceEnabled(ctx.config()); - assert txLog == null; - - txLog = new TxLog(ctx, mgr); - - startVacuumWorkers(); - } - - /** {@inheritDoc} */ - @Override public DiscoveryDataExchangeType discoveryDataType() { - return DiscoveryDataExchangeType.CACHE_CRD_PROC; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - Integer cmpId = discoveryDataType().ordinal(); - - if (!dataBag.commonDataCollectedFor(cmpId)) - dataBag.addGridCommonData(cmpId, discoData); - } - - /** {@inheritDoc} */ - @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - MvccDiscoveryData discoData0 = (MvccDiscoveryData)data.commonData(); - - // Disco data might be null in case the first joined node is daemon. - if (discoData0 != null) { - discoData = discoData0; - - log.info("Received mvcc coordinator on node join: " + discoData.coordinator()); - - assert discoData != null; - } + // No-op. } /** {@inheritDoc} */ - @Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { - if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT) + @Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer, + @Nullable DiscoveryCustomMessage customMsg) { + if (evtType == EVT_NODE_METRICS_UPDATED) return; - MvccCoordinator crd; - - if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED) - crd = null; - else { - crd = discoData.coordinator(); - - if (crd == null || - ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { - ClusterNode crdNode = null; - - if (crdC != null) { - crdNode = crdC.apply(nodes); - - if (log.isInfoEnabled()) - log.info("Assigned coordinator using test closure: " + crd); - } - else { - // Expect nodes are sorted by order. - for (ClusterNode node : nodes) { - if (!node.isClient()) { - crdNode = node; - - break; - } - } - } - - crd = crdNode != null ? new MvccCoordinator(crdNode.id(), coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null; - - if (crd != null) { - if (log.isInfoEnabled()) - log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']'); - } - else - U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); - } - } - - discoData = new MvccDiscoveryData(crd); + if (evtType == EVT_DISCOVERY_CUSTOM_EVT) + checkMvccCacheStarted(customMsg); + else + assignMvccCoordinator(evtType, nodes, topVer); } /** {@inheritDoc} */ @@ -384,8 +383,8 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } /** {@inheritDoc} */ - @Override public void onExchangeDone(boolean newCoord, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) { - if (!newCoord) + @Override public void onExchangeDone(boolean newCrd, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) { + if (!newCrd) return; ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange(); @@ -406,7 +405,7 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + ", crdVer=" + crdVer + ']'); - prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); + prevCrdQueries.init(activeQueries, F.view(discoCache.allNodes(), this::supportsMvcc), ctx.discovery()); initFut.onDone(); } @@ -435,8 +434,8 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } /** {@inheritDoc} */ - @Override @Nullable public MvccCoordinator coordinatorFromDiscoveryEvent() { - return discoData != null ? discoData.coordinator() : null; + @Override @Nullable public MvccCoordinator assignedCoordinator() { + return assignedCrd; } /** {@inheritDoc} */ @@ -458,6 +457,8 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public byte state(MvccVersion ver) throws IgniteCheckedException { + assert txLog != null && mvccEnabled; + return txLog.get(ver.coordinatorVersion(), ver.counter()); } @@ -468,6 +469,8 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D /** {@inheritDoc} */ @Override public void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException { + assert txLog != null && mvccEnabled; + TxKey key = new TxKey(ver.coordinatorVersion(), ver.counter()); txLog.put(key, state, primary); @@ -808,12 +811,104 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D return ctx.config().getDataStorageConfiguration(); } + /** */ + private void assignMvccCoordinator(int evtType, Collection<ClusterNode> nodes, long topVer) { + checkMvccSupported(nodes); + + MvccCoordinator crd; + + if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED) + crd = null; + else { + crd = assignedCrd; + + if (crd == null || + ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { + ClusterNode crdNode = null; + + if (crdC != null) { + crdNode = crdC.apply(nodes); + + if (log.isInfoEnabled()) + log.info("Assigned coordinator using test closure: " + crd); + } + else { + // Expect nodes are sorted by order. + for (ClusterNode node : nodes) { + if (!node.isClient() && supportsMvcc(node)) { + crdNode = node; + + break; + } + } + } + + crd = crdNode != null ? new MvccCoordinator(crdNode.id(), coordinatorVersion(crdNode), + new AffinityTopologyVersion(topVer, 0)) : null; + + if (log.isInfoEnabled() && crd != null) + log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']'); + else if (crd == null) + U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); + } + } + + assignedCrd = crd; + } + /** - * @param topVer Topology version. + * @param crdNode Assigned coordinator node. * @return Coordinator version. */ - private long coordinatorVersion(long topVer) { - return topVer + ctx.discovery().gridStartTime(); + private long coordinatorVersion(ClusterNode crdNode) { + return crdNode.order() + ctx.discovery().gridStartTime(); + } + + /** */ + private void checkMvccSupported(Collection<ClusterNode> nodes) { + if (mvccEnabled) { + assert mvccSupported; + + return; + } + + boolean res = true, was = mvccSupported; + + for (ClusterNode node : nodes) { + if (!supportsMvcc(node)) { + res = false; + + break; + } + } + + if (was != res) + mvccSupported = res; + } + + /** */ + private boolean supportsMvcc(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) >= 0; + } + + /** */ + private void checkMvccCacheStarted(@Nullable DiscoveryCustomMessage customMsg) { + assert customMsg != null; + + if (!mvccEnabled && customMsg instanceof DynamicCacheChangeBatch) { + for (DynamicCacheChangeRequest req : ((DynamicCacheChangeBatch)customMsg).requests()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + if (ccfg == null) + continue; + + if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) { + assert mvccSupported; + + mvccEnabled = true; + } + } + } } /** @@ -952,11 +1047,11 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D cleanupQueue = new LinkedBlockingQueue<>(); - vacuumWorkers = new ArrayList<>(ctx.config().getMvccVacuumThreadCnt() + 1); + vacuumWorkers = new ArrayList<>(ctx.config().getMvccVacuumThreadCount() + 1); vacuumWorkers.add(new VacuumScheduler(ctx, log, this)); - for (int i = 0; i < ctx.config().getMvccVacuumThreadCnt(); i++) { + for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++) { vacuumWorkers.add(new VacuumWorker(ctx, log, cleanupQueue)); } @@ -1895,7 +1990,7 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D VacuumScheduler(GridKernalContext ctx, IgniteLogger log, MvccProcessorImpl prc) { super(ctx.igniteInstanceName(), "vacuum-scheduler", log); - this.interval = ctx.config().getMvccVacuumTimeInterval(); + this.interval = ctx.config().getMvccVacuumFrequency(); this.prc = prc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index c75393e..0422459 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -102,14 +102,6 @@ public class MvccUtils { } /** - * @param ctx Kernal context. - * @return Newly created Mvcc processor. - */ - public static MvccProcessor createProcessor(GridKernalContext ctx) { - return mvccEnabled(ctx) ? new MvccProcessorImpl(ctx) : new NoOpMvccProcessor(ctx); - } - - /** * @param cctx Cache context. * @param mvccCrd Mvcc coordinator version. * @param mvccCntr Mvcc counter. @@ -720,10 +712,10 @@ public class MvccUtils { /** * @param ctx Grid kernal context. - * @return Whether MVCC is enabled or not on {@link IgniteConfiguration}. + * @return Whether MVCC is enabled or not. */ public static boolean mvccEnabled(GridKernalContext ctx) { - return ctx.config().isMvccEnabled(); + return ctx.coordinators().mvccEnabled(); } /** @@ -794,6 +786,19 @@ public class MvccUtils { return snapshot; } + /** + * Throws atomicity modes compatibility validation exception. + * + * @param ctx1 Cache context. + * @param ctx2 Another cache context. + */ + public static void throwAtomicityModesMismatchException(GridCacheContext ctx1, GridCacheContext ctx2) { + throw new IgniteException("Caches with transactional_snapshot atomicity mode cannot participate in the same" + + " transaction with caches having another atomicity mode. [cacheName=" + ctx1.name() + + ", cacheMode=" + ctx1.config().getAtomicityMode() + + ", anotherCacheName=" + ctx2.name() + " anotherCacheMode=" + ctx2.config().getAtomicityMode() + ']'); + } + /** */ private static MvccVersion mvccVersion(long crd, long cntr, int opCntr) { return new MvccVersionImpl(crd, cntr, opCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java deleted file mode 100644 index b9a5132..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.ExchangeContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.util.GridLongList; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -class NoOpMvccProcessor extends GridProcessorAdapter implements MvccProcessor { - /** - * @param ctx Kernal context. - */ - protected NoOpMvccProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onExchangeStart(MvccCoordinator mvccCrd, ExchangeContext exchCtx, ClusterNode exchCrd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onExchangeDone(boolean newCoord, DiscoCache discoCache, - Map<UUID, GridLongList> activeQueries) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Nullable @Override public MvccCoordinator currentCoordinator() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public MvccCoordinator currentCoordinator(AffinityTopologyVersion topVer) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public MvccCoordinator coordinatorFromDiscoveryEvent() { - return null; - } - - /** {@inheritDoc} */ - @Override public UUID currentCoordinatorId() { - return null; - } - - /** {@inheritDoc} */ - @Override public void updateCoordinator(MvccCoordinator curCrd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public byte state(long crdVer, long cntr) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public byte state(MvccVersion ver) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void updateState(MvccVersion ver, byte state) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void updateState(MvccVersion ver, byte state, boolean primary) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void registerLocalTransaction(long crd, long cntr) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction(long crd, long cntr) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, - MvccVersion locked) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void addQueryTracker(MvccQueryTracker tracker) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void removeQueryTracker(Long id) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public MvccSnapshot tryRequestSnapshotLocal() { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public MvccSnapshot tryRequestSnapshotLocal( - @Nullable IgniteInternalTx tx) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void requestSnapshotAsync(MvccSnapshotResponseListener lsnr) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot, - long qryId) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void ackTxRollback(MvccVersion updateVer) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { - throw processorException(); - } - - /** {@inheritDoc} */ - @Override public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) { - // No-op. - } - - /** - * @return No-op processor usage exception; - */ - private IgniteException processorException() { - return new IgniteException("Current Ignite configuration does not support MVCC functionality " + - "(consider adding ignite-schedule module to classpath)."); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 0470fad..ab60f47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -497,7 +497,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final boolean includeExpired) throws IgniteCheckedException { //TODO IGNITE-7953 - if (!cctx.atomic() && cctx.kernalContext().config().isMvccEnabled()) + if (cctx.transactionalSnapshot()) throw new UnsupportedOperationException("Continuous queries are not supported for transactional caches " + "when MVCC is enabled."); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 7dda67f..97ce20a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.authentication.AuthorizationContext import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; @@ -190,10 +191,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { log = ctx.log(getClass()); - if (ctx.grid().configuration().isMvccEnabled()) - worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx); - else - worker = null; + // TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads. + worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx); } /** {@inheritDoc} */ @@ -204,7 +203,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { JdbcRequest req = (JdbcRequest)req0; - if (worker == null) + if (!MvccUtils.mvccEnabled(ctx)) return doHandle(req); else { GridFutureAdapter<ClientListenerResponse> fut = worker.process(req);
