IGNITE-9320: MVCC configuration: added new CacheAtomicityMode.MVCC_SNAPSHOT. 
This closes #4660.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ab94934
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ab94934
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ab94934

Branch: refs/heads/master
Commit: 2ab949349050a5affc66f01d4f2f00b81776143c
Parents: f018251
Author: rkondakov <[email protected]>
Authored: Mon Sep 17 13:08:19 2018 +0300
Committer: devozerov <[email protected]>
Committed: Mon Sep 17 13:08:19 2018 +0300

----------------------------------------------------------------------
 .../JdbcThinConnectionMvccEnabledSelfTest.java  |   4 +-
 ...ThinTransactionsAbstractComplexSelfTest.java |  10 +-
 .../jdbc/thin/JdbcThinTransactionsSelfTest.java |   6 +-
 ...ThinTransactionsWithMvccEnabledSelfTest.java |   6 +-
 .../apache/ignite/cache/CacheAtomicityMode.java |  33 +-
 .../configuration/IgniteConfiguration.java      |  48 +--
 .../apache/ignite/internal/IgniteKernal.java    |   4 +-
 .../ignite/internal/IgniteNodeAttributes.java   |   3 -
 .../discovery/GridDiscoveryManager.java         |  18 +-
 .../affinity/GridAffinityAssignmentCache.java   |   5 +-
 .../processors/cache/CacheGroupContext.java     |  21 +-
 .../processors/cache/ClusterCachesInfo.java     |  20 +-
 .../processors/cache/GridCacheContext.java      |  10 +-
 .../processors/cache/GridCacheProcessor.java    |  70 +++--
 .../mvcc/MvccPreviousCoordinatorQueries.java    |   7 +-
 .../processors/cache/mvcc/MvccProcessor.java    |  44 ++-
 .../cache/mvcc/MvccProcessorImpl.java           | 309 ++++++++++++-------
 .../processors/cache/mvcc/MvccUtils.java        |  27 +-
 .../cache/mvcc/NoOpMvccProcessor.java           | 215 -------------
 .../continuous/CacheContinuousQueryManager.java |   2 +-
 .../odbc/jdbc/JdbcRequestHandler.java           |   9 +-
 .../odbc/odbc/OdbcRequestHandler.java           |  11 +-
 .../mvcc/CacheMvccAbstractFeatureTest.java      |   2 +-
 .../cache/mvcc/CacheMvccAbstractTest.java       |  21 +-
 .../cache/mvcc/CacheMvccClusterRestartTest.java |   6 +-
 .../CacheMvccConfigurationValidationTest.java   | 245 ++++++++++++++-
 .../mvcc/CacheMvccOperationChecksTest.java      |   4 +-
 .../mvcc/CacheMvccProcessorLazyStartTest.java   | 175 +++++++++++
 .../cache/mvcc/CacheMvccProcessorTest.java      |   4 +
 .../cache/mvcc/CacheMvccVacuumTest.java         |  36 ++-
 .../MemoryPolicyInitializationTest.java         |   8 +-
 .../DataStreamProcessorMvccSelfTest.java        |  17 +-
 .../DataStreamProcessorSelfTest.java            |  20 +-
 .../configvariations/ConfigVariations.java      |   2 +-
 .../testsuites/IgniteCacheMvccTestSuite.java    |   2 +
 .../processors/query/h2/IgniteH2Indexing.java   | 102 ++++--
 .../query/h2/PreparedStatementEx.java           |   7 +-
 .../query/h2/dml/UpdatePlanBuilder.java         |   3 +-
 .../query/h2/sql/GridSqlQueryParser.java        |   2 +
 ...sactionsCommandsWithMvccEnabledSelfTest.java |   4 +-
 .../cache/index/SqlTransactionsSelfTest.java    |   5 +-
 .../cache/mvcc/CacheMvccBulkLoadTest.java       |   2 +-
 .../cache/mvcc/CacheMvccDmlSimpleTest.java      |   2 +-
 ...cheMvccSelectForUpdateQueryAbstractTest.java |  19 +-
 .../cache/mvcc/CacheMvccSizeTest.java           |   4 +-
 ...CacheMvccSqlConfigurationValidationTest.java | 111 +++++++
 .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java |   4 +-
 .../mvcc/CacheMvccStreamingInsertTest.java      |   2 +-
 .../query/h2/GridIndexRebuildSelfTest.java      |   2 +-
 ...GridIndexRebuildWithMvccEnabledSelfTest.java |   5 +-
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |   2 +
 .../cpp/odbc-test/config/queries-default.xml    | 198 ++++++------
 .../odbc-test/config/queries-transaction-32.xml |  11 +-
 .../odbc-test/config/queries-transaction.xml    |  11 +-
 .../config/mvcc/benchmark-mvcc-messages.sh      |  19 +-
 .../config/mvcc/benchmark-mvcc-processor.sh     |  15 +-
 .../mvcc/benchmark-mvcc-updates-contention.sh   |  15 +-
 .../mvcc/benchmark-thin-native.properties       |  47 ++-
 ...benchmark-jdbc-thin-inmemory-mvcc.properties |  21 +-
 .../yardstick/IgniteBenchmarkArguments.java     |  18 +-
 .../org/apache/ignite/yardstick/IgniteNode.java |   2 -
 .../mvcc/MvccUpdateContentionBenchmark.java     |   5 +-
 62 files changed, 1292 insertions(+), 770 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java
index 051d1d2..0196cb2 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMvccEnabledSelfTest.java
@@ -23,6 +23,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Savepoint;
 import java.util.concurrent.Callable;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -67,8 +68,6 @@ public class JdbcThinConnectionMvccEnabledSelfTest extends 
JdbcThinAbstractSelfT
 
         cfg.setGridLogger(new GridStringLogger());
 
-        cfg.setMvccEnabled(true);
-
         return cfg;
     }
 
@@ -81,6 +80,7 @@ public class JdbcThinConnectionMvccEnabledSelfTest extends 
JdbcThinAbstractSelfT
         CacheConfiguration cfg = defaultCacheConfiguration();
 
         cfg.setName(name);
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
 
         return cfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
index 28c65a9..68ed36b 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
@@ -116,15 +116,13 @@ public abstract class 
JdbcThinTransactionsAbstractComplexSelfTest extends JdbcTh
     @Override protected IgniteConfiguration getConfiguration(String 
testIgniteInstanceName) throws Exception {
         IgniteConfiguration cfg = 
super.getConfiguration(testIgniteInstanceName);
 
-        cfg.setMvccEnabled(true);
-
         CacheConfiguration<Integer, Person> ccfg = new 
CacheConfiguration<>("Person");
 
         ccfg.setIndexedTypes(Integer.class, Person.class);
 
         ccfg.getQueryEntities().iterator().next().setKeyFieldName("id");
 
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
 
         ccfg.setCacheMode(CacheMode.PARTITIONED);
 
@@ -145,14 +143,14 @@ public abstract class 
JdbcThinTransactionsAbstractComplexSelfTest extends JdbcTh
         execute("ALTER TABLE \"Person\".person add if not exists companyid 
int");
 
         execute("CREATE TABLE City (id int primary key, name varchar, 
population int) WITH " +
-            
"\"atomicity=transactional,template=partitioned,backups=3,cache_name=City\"");
+            
"\"atomicity=transactional_snapshot,template=partitioned,backups=3,cache_name=City\"");
 
         execute("CREATE TABLE Company (id int, \"cityid\" int, name varchar, 
primary key (id, \"cityid\")) WITH " +
-            
"\"atomicity=transactional,template=partitioned,backups=1,wrap_value=false,affinity_key=cityid,"
 +
+            
"\"atomicity=transactional_snapshot,template=partitioned,backups=1,wrap_value=false,affinity_key=cityid,"
 +
             "cache_name=Company\"");
 
         execute("CREATE TABLE Product (id int primary key, name varchar, 
companyid int) WITH " +
-            
"\"atomicity=transactional,template=partitioned,backups=2,cache_name=Product\"");
+            
"\"atomicity=transactional_snapshot,template=partitioned,backups=2,cache_name=Product\"");
 
         execute("CREATE INDEX IF NOT EXISTS prodidx ON Product(companyid)");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java
index 1619996..a8fa47b 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsSelfTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -58,8 +59,6 @@ public class JdbcThinTransactionsSelfTest extends 
JdbcThinAbstractSelfTest {
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setMvccEnabled(true);
-
         cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME));
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -84,6 +83,7 @@ public class JdbcThinTransactionsSelfTest extends 
JdbcThinAbstractSelfTest {
         CacheConfiguration cfg = defaultCacheConfiguration();
 
         cfg.setName(name);
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
 
         return cfg;
     }
@@ -97,7 +97,7 @@ public class JdbcThinTransactionsSelfTest extends 
JdbcThinAbstractSelfTest {
         try (Connection c = c(true, NestedTxMode.ERROR)) {
             try (Statement s = c.createStatement()) {
                 s.execute("CREATE TABLE INTS (k int primary key, v int) WITH 
\"cache_name=ints,wrap_value=false," +
-                    "atomicity=transactional\"");
+                    "atomicity=transactional_snapshot\"");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java
index e3f7f14..e01a53d 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -58,8 +59,6 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest 
extends JdbcThinAbstrac
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setMvccEnabled(true);
-
         cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME));
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -84,6 +83,7 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest 
extends JdbcThinAbstrac
         CacheConfiguration cfg = defaultCacheConfiguration();
 
         cfg.setName(name);
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
 
         return cfg;
     }
@@ -97,7 +97,7 @@ public class JdbcThinTransactionsWithMvccEnabledSelfTest 
extends JdbcThinAbstrac
         try (Connection c = c(true, NestedTxMode.ERROR)) {
             try (Statement s = c.createStatement()) {
                 s.execute("CREATE TABLE INTS (k int primary key, v int) WITH 
\"cache_name=ints,wrap_value=false," +
-                    "atomicity=transactional\"");
+                    "atomicity=transactional_snapshot\"");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
index 79a8e5f..1584286 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
@@ -33,8 +33,12 @@ import org.jetbrains.annotations.Nullable;
  */
 public enum CacheAtomicityMode {
     /**
-     * Specified fully {@code ACID}-compliant transactional cache behavior. See
+     * Specified fully {@code ACID}-compliant transactional cache behavior for 
key-value API. See
      * {@link Transaction} for more information about transactions.
+     * <p>
+     * <b>Note:</b> this mode guaranties transactional behavior <b>only for 
key-value API</b> operations.
+     * For ACID SQL transactions use {@code 
CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode.
+     * </p>
      */
     TRANSACTIONAL,
 
@@ -88,7 +92,32 @@ public enum CacheAtomicityMode {
      *
      * @see IgniteCache#withNoRetries()
      */
-    ATOMIC;
+    ATOMIC,
+
+    /**
+     * Specified fully {@code ACID}-compliant transactional cache behavior not 
only for key-value API,
+     * but also for SQL transactions.
+     * <p>
+     * This cache atomicity mode is implemented within multiversion 
concurrency control (MVCC) where database can
+     * contain multiple versions of each row to allow readers do not collide 
with writers.
+     * Each update in this mode generates a new version of a row and don't 
remove a previous one.
+     * Old versions are cleaned only when they are not visible to anyone.
+     * </p>
+     * <p>
+     * There is one node in cluster is elected as MVCC coordinator. This node 
tracks all in-flight transactions and
+     * queries in the cluster.
+     * Each transaction or query over the cache with {@code 
TRANSACTIONAL_SNAPSHOT} mode obtains current
+     * database snapshot from the coordinator. This snapshot allows 
transactions and queries to skip invisible
+     * updates made by concurrent transactions to always observe the same 
consistent database state.
+     * </p>
+     * <p>
+     * <b>Note!</b> This atomicity mode is not interoperable with the other 
atomicity modes in the same transaction.
+     * Caches participated in transaction should either be all {@code 
TRANSACTIONAL} or all
+     * {@code TRANSACTIONAL_SNAPSHOT}, but not the mixed ones.
+     * </p>
+     * See {@link Transaction} for more information about transactions.
+     */
+    TRANSACTIONAL_SNAPSHOT;
 
     /** Enumerated values. */
     private static final CacheAtomicityMode[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 3060caa..6a0c7cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -218,8 +218,7 @@ public class IgniteConfiguration {
     public static final int DFLT_MVCC_VACUUM_THREAD_CNT = 2;
 
     /** Default time interval between vacuum process runs (ms). */
-    public static final int DFLT_MVCC_VACUUM_TIME_INTERVAL = 5000;
-
+    public static final int DFLT_MVCC_VACUUM_FREQUENCY = 5000;
 
     /** Optional local Ignite instance name. */
     private String igniteInstanceName;
@@ -494,14 +493,11 @@ public class IgniteConfiguration {
     /** Client connector configuration. */
     private ClientConnectorConfiguration cliConnCfg = 
ClientListenerProcessor.DFLT_CLI_CFG;
 
-    /** Flag whether MVCC is enabled. */
-    private boolean mvccEnabled;
-
     /** Size of MVCC vacuum thread pool. */
     private int mvccVacuumThreadCnt = DFLT_MVCC_VACUUM_THREAD_CNT;
 
     /** Time interval between vacuum process runs (ms). */
-    private int mvccVacuumTimeInterval = DFLT_MVCC_VACUUM_TIME_INTERVAL;
+    private int mvccVacuumFreq = DFLT_MVCC_VACUUM_FREQUENCY;
 
     /** User authentication enabled. */
     private boolean authEnabled;
@@ -595,9 +591,8 @@ public class IgniteConfiguration {
         metricsLogFreq = cfg.getMetricsLogFrequency();
         metricsUpdateFreq = cfg.getMetricsUpdateFrequency();
         mgmtPoolSize = cfg.getManagementThreadPoolSize();
-        mvccEnabled = cfg.isMvccEnabled();
-        mvccVacuumThreadCnt = cfg.mvccVacuumThreadCnt;
-        mvccVacuumTimeInterval = cfg.mvccVacuumTimeInterval;
+        mvccVacuumThreadCnt = cfg.getMvccVacuumThreadCount();
+        mvccVacuumFreq = cfg.getMvccVacuumFrequency();
         netTimeout = cfg.getNetworkTimeout();
         nodeId = cfg.getNodeId();
         odbcCfg = cfg.getOdbcConfiguration();
@@ -3003,32 +2998,11 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Whether or not MVCC is enabled.
-     *
-     * @return {@code True} if MVCC is enabled.
-     */
-    public boolean isMvccEnabled() {
-        return mvccEnabled;
-    }
-
-    /**
-     * Sets MVCC enabled flag.
-     *
-     * @param mvccEnabled MVCC enabled flag.
-     * @return {@code this} for chaining.
-     */
-    public IgniteConfiguration setMvccEnabled(boolean mvccEnabled) {
-        this.mvccEnabled = mvccEnabled;
-
-        return this;
-    }
-
-    /**
      * Returns number of MVCC vacuum cleanup threads.
      *
      * @return Number of MVCC vacuum cleanup threads.
      */
-    public int getMvccVacuumThreadCnt() {
+    public int getMvccVacuumThreadCount() {
         return mvccVacuumThreadCnt;
     }
 
@@ -3038,7 +3012,7 @@ public class IgniteConfiguration {
      * @param mvccVacuumThreadCnt Number of MVCC vacuum cleanup threads.
      * @return {@code this} for chaining.
      */
-    public IgniteConfiguration setMvccVacuumThreadCnt(int mvccVacuumThreadCnt) 
{
+    public IgniteConfiguration setMvccVacuumThreadCount(int 
mvccVacuumThreadCnt) {
         this.mvccVacuumThreadCnt = mvccVacuumThreadCnt;
 
         return this;
@@ -3049,18 +3023,18 @@ public class IgniteConfiguration {
      *
      * @return Time interval between vacuum runs.
      */
-    public int getMvccVacuumTimeInterval() {
-        return mvccVacuumTimeInterval;
+    public int getMvccVacuumFrequency() {
+        return mvccVacuumFreq;
     }
 
     /**
      * Sets time interval between vacuum runs.
      *
-     * @param mvccVacuumTimeInterval Time interval between vacuum runs.
+     * @param mvccVacuumFreq Time interval between vacuum runs.
      * @return {@code this} for chaining.
      */
-    public IgniteConfiguration setMvccVacuumTimeInterval(int 
mvccVacuumTimeInterval) {
-        this.mvccVacuumTimeInterval = mvccVacuumTimeInterval;
+    public IgniteConfiguration setMvccVacuumFrequency(int mvccVacuumFreq) {
+        this.mvccVacuumFreq = mvccVacuumFreq;
 
         return this;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 230c05c..d74b3aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -125,7 +125,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessorImpl;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -994,7 +994,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             // be able to start receiving messages once discovery completes.
             try {
                 startProcessor(new PdsConsistentIdProcessor(ctx));
-                startProcessor(MvccUtils.createProcessor(ctx));
+                startProcessor(new MvccProcessorImpl(ctx));
                 
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
                 startProcessor(new GridAffinityProcessor(ctx));
                 
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 539727d..5b764e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -202,9 +202,6 @@ public final class IgniteNodeAttributes {
     /** Internal attribute name constant. */
     public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = 
ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";
 
-    /** Mvcc enabled flag. */
-    public static final String ATTR_MVCC_ENABLED = ATTR_PREFIX + 
".mvcc.enabled";
-
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 48e3318..b9af961 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -162,7 +162,6 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MVCC_ENABLED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_OFFHEAP_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
@@ -658,7 +657,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     updateClientNodes(node.id());
                 }
 
-                ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer);
+                ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer, 
customMsg);
 
                 boolean locJoinEvt = type == EVT_NODE_JOINED && 
node.id().equals(locNode.id());
 
@@ -1222,8 +1221,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         Boolean locSrvcCompatibilityEnabled = 
locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
         Boolean locSecurityCompatibilityEnabled = 
locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
 
-        Boolean locMvccEnabled = locNode.attribute(ATTR_MVCC_ENABLED);
-
         for (ClusterNode n : nodes) {
             int rmtJvmMajVer = nodeJavaMajorVersion(n);
 
@@ -1321,17 +1318,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     ", locNodeId=" + locNode.id() + ", rmtNode=" + 
U.toShortString(n) + "]");
             }
 
-            Boolean rmtMvccEnabled = n.attribute(ATTR_MVCC_ENABLED);
-
-            if (!F.eq(locMvccEnabled, rmtMvccEnabled)) {
-                throw new IgniteCheckedException("Remote node has MVCC mode 
different from local " +
-                    "[locId8=" +  U.id8(locNode.id()) +
-                    ", locMvccMode=" + (Boolean.TRUE.equals(locMvccEnabled) ? 
"ENABLED" : "DISABLED") +
-                    ", rmtId8=" + U.id8(n.id()) +
-                    ", rmtMvccMode=" + (Boolean.TRUE.equals(rmtMvccEnabled) ? 
"ENABLED" : "DISABLED") +
-                    ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + 
U.toShortString(n) + "]");
-            }
-
             if 
(n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0
                 && ctx.security().enabled() // Matters only if security 
enabled.
                ) {
@@ -2388,7 +2374,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         Collection<ClusterNode> topSnapshot) {
         assert topSnapshot.contains(loc);
 
-        MvccCoordinator mvccCrd = 
ctx.coordinators().coordinatorFromDiscoveryEvent();
+        MvccCoordinator mvccCrd = ctx.coordinators().assignedCoordinator();
 
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
         HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index cc2c17c..2290ce6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -203,7 +203,10 @@ public class GridAffinityAssignmentCache {
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment) {
-        MvccCoordinator mvccCrd = 
ctx.cache().context().coordinators().currentCoordinator(topVer);
+        MvccCoordinator mvccCrd = null;
+
+        if (!locCache)
+            mvccCrd = 
ctx.cache().context().coordinators().currentCoordinator(topVer);
 
         initialize(topVer, affAssignment, mvccCrd);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index c100d16..b92280a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -30,7 +30,6 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TopologyValidator;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -42,7 +41,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
@@ -61,7 +59,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
@@ -218,26 +216,13 @@ public class CacheGroupContext {
 
         storeCacheId = affNode && dataRegion.config().getPageEvictionMode() != 
DataPageEvictionMode.DISABLED;
 
+        mvccEnabled = ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
+
         log = ctx.kernalContext().log(getClass());
 
         caches = new ArrayList<>();
 
         mxBean = new CacheGroupMetricsMXBeanImpl(this);
-
-        mvccEnabled = mvccEnabled(ctx.gridConfig(), ccfg, cacheType);
-    }
-
-    /**
-     * @param cfg Ignite configuration.
-     * @param ccfg Cache configuration.
-     * @param cacheType Cache typr.
-     * @return {@code True} if mvcc is enabled for given cache.
-     */
-    public static boolean mvccEnabled(IgniteConfiguration cfg, 
CacheConfiguration ccfg, CacheType cacheType) {
-        return cfg.isMvccEnabled() &&
-            cacheType == CacheType.USER &&
-            ccfg.getCacheMode() != LOCAL &&
-            ccfg.getAtomicityMode() == TRANSACTIONAL;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 8eab9c0..572e33e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -69,6 +69,7 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -147,7 +148,7 @@ class ClusterCachesInfo {
             if (ccfg == null)
                 grpCfgs.put(info.cacheData().config().getGroupName(), 
info.cacheData().config());
             else
-                validateCacheGroupConfiguration(ccfg, 
info.cacheData().config(), info.cacheType());
+                validateCacheGroupConfiguration(ccfg, 
info.cacheData().config());
         }
 
         String conflictErr = processJoiningNode(joinDiscoData, 
ctx.localNodeId(), true);
@@ -220,7 +221,7 @@ class ClusterCachesInfo {
                 }
 
                 if (checkConsistency)
-                    validateStartCacheConfiguration(locCfg, 
cacheData.cacheType());
+                    validateStartCacheConfiguration(locCfg);
             }
         }
 
@@ -1864,17 +1865,16 @@ class ClusterCachesInfo {
 
     /**
      * @param ccfg Cache configuration to start.
-     * @param cacheType Cache type.
      * @throws IgniteCheckedException If failed.
      */
-    void validateStartCacheConfiguration(CacheConfiguration ccfg, CacheType 
cacheType) throws IgniteCheckedException {
+    void validateStartCacheConfiguration(CacheConfiguration ccfg) throws 
IgniteCheckedException {
         if (ccfg.getGroupName() != null) {
             CacheGroupDescriptor grpDesc = 
cacheGroupByName(ccfg.getGroupName());
 
             if (grpDesc != null) {
                 assert ccfg.getGroupName().equals(grpDesc.groupName());
 
-                validateCacheGroupConfiguration(grpDesc.config(), ccfg, 
cacheType);
+                validateCacheGroupConfiguration(grpDesc.config(), ccfg);
             }
         }
     }
@@ -1882,10 +1882,9 @@ class ClusterCachesInfo {
     /**
      * @param cfg Existing configuration.
      * @param startCfg Cache configuration to start.
-     * @param cacheType Cache type.
      * @throws IgniteCheckedException If validation failed.
      */
-    private void validateCacheGroupConfiguration(CacheConfiguration cfg, 
CacheConfiguration startCfg, CacheType cacheType)
+    private void validateCacheGroupConfiguration(CacheConfiguration cfg, 
CacheConfiguration startCfg)
         throws IgniteCheckedException {
         GridCacheAttributes attr1 = new GridCacheAttributes(cfg);
         GridCacheAttributes attr2 = new GridCacheAttributes(startCfg);
@@ -1893,10 +1892,9 @@ class ClusterCachesInfo {
         CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, 
"cacheMode", "Cache mode",
             cfg.getCacheMode(), startCfg.getCacheMode(), true);
 
-        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, 
"mvccEnabled", "MVCC mode",
-            CacheGroupContext.mvccEnabled(ctx.config(), cfg, cacheType),
-            CacheGroupContext.mvccEnabled(ctx.config(), startCfg, cacheType),
-            true);
+        if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT || 
startCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+            CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, 
"atomicityMode", "Atomicity mode",
+                attr1.atomicityMode(), attr2.atomicityMode(), true);
 
         CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, 
"affinity", "Affinity function",
             attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), 
true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index ac9de7c..7d200eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -112,6 +112,7 @@ import org.jetbrains.annotations.Nullable;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
@@ -817,7 +818,14 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @return {@code True} if transactional.
      */
     public boolean transactional() {
-        return cacheCfg.getAtomicityMode() == TRANSACTIONAL;
+        return cacheCfg.getAtomicityMode() == TRANSACTIONAL || 
cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /**
+     * @return {@code True} if transactional snapshot.
+     */
+    public boolean transactionalSnapshot() {
+        return cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e0a2420..f000da7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-import javax.cache.expiry.EternalExpiryPolicy;
 import javax.management.MBeanServer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -165,7 +164,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTR
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -278,6 +277,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         throws IgniteCheckedException {
         CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
 
+        ctx.coordinators().preProcessCacheConfiguration(cfg);
         ctx.igfsHelper().preProcessCacheConfiguration(cfg);
     }
 
@@ -525,6 +525,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         }
 
         ctx.igfsHelper().validateCacheConfiguration(cc);
+        ctx.coordinators().validateCacheConfiguration(cc);
 
         if (cc.getAtomicityMode() == ATOMIC)
             assertParameter(cc.getTransactionManagerLookupClassName() == null,
@@ -543,24 +544,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 "other cache types [cacheName=" + cc.getName() + ", 
groupName=" + cc.getGroupName() +
                 ", cacheType=" + cacheType + "]");
 
-        if (cc.getAtomicityMode() == TRANSACTIONAL && c.isMvccEnabled()) {
-            if (cc.getCacheStoreFactory() != null) {
-                throw new IgniteCheckedException("Transactional cache may not 
have a third party cache store when " +
-                    "MVCC is enabled.");
-            }
-
-            if (cc.getExpiryPolicyFactory() != null && 
!(cc.getExpiryPolicyFactory().create() instanceof
-                EternalExpiryPolicy)) {
-                throw new IgniteCheckedException("Transactional cache may not 
have expiry policy when " +
-                    "MVCC is enabled.");
-            }
-
-            if (cc.getInterceptor() != null) {
-                throw new IgniteCheckedException("Transactional cache may not 
have an interceptor when " +
-                    "MVCC is enabled.");
-            }
-        }
-
         // Make sure we do not use sql schema for system views.
         if (ctx.query().moduleEnabled()) {
             String schema = QueryUtils.normalizeSchemaName(cc.getName(), 
cc.getSqlSchema());
@@ -827,14 +810,42 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (CU.isPersistenceEnabled(ctx.config()) && 
ctx.cache().context().pageStore() != null) {
             Map<String, StoredCacheData> storedCaches = 
ctx.cache().context().pageStore().readCacheConfigurations();
 
-            if (!F.isEmpty(storedCaches))
+            if (!F.isEmpty(storedCaches)) {
                 for (StoredCacheData storedCacheData : storedCaches.values()) {
                     String cacheName = storedCacheData.config().getName();
 
                     //Ignore stored caches if it already added by static 
config(static config has higher priority).
                     if (!caches.containsKey(cacheName))
                         addStoredCache(caches, storedCacheData, cacheName, 
cacheType(cacheName), false);
+                    else {
+                        CacheConfiguration cfg = 
caches.get(cacheName).cacheData().config();
+                        CacheConfiguration cfgFromStore = 
storedCacheData.config();
+
+                        validateCacheConfigurationOnRestore(cfg, cfgFromStore);
+                    }
                 }
+            }
+        }
+    }
+
+    /**
+     * Validates cache configuration against stored cache configuration when 
persistence is enabled.
+     *
+     * @param cfg Configured cache configuration.
+     * @param cfgFromStore Stored cache configuration
+     * @throws IgniteCheckedException If validation failed.
+     */
+    private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, 
CacheConfiguration cfgFromStore)
+        throws IgniteCheckedException {
+        assert cfg != null && cfgFromStore != null;
+
+        if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT ||
+            cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+            && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) {
+            throw new IgniteCheckedException("Cannot start cache. Statically 
configured atomicity mode differs from " +
+                "previously stored configuration. Please check your 
configuration: [cacheName=" + cfg.getName() +
+                ", configuredAtomicityMode=" + cfg.getAtomicityMode() +
+                ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + 
"]");
         }
     }
 
@@ -1462,6 +1473,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         pluginMgr.validate();
 
+        if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+            sharedCtx.coordinators().ensureStarted();
+
         sharedCtx.jta().registerCache(cfg);
 
         // Skip suggestions for internal caches.
@@ -1532,7 +1546,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         switch (cfg.getCacheMode()) {
             case LOCAL: {
                 switch (cfg.getAtomicityMode()) {
-                    case TRANSACTIONAL: {
+                    case TRANSACTIONAL:
+                    case TRANSACTIONAL_SNAPSHOT: {
                         cache = new GridLocalCache(cacheCtx);
 
                         break;
@@ -1554,7 +1569,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             case REPLICATED: {
                 if (nearEnabled) {
                     switch (cfg.getAtomicityMode()) {
-                        case TRANSACTIONAL: {
+                        case TRANSACTIONAL:
+                        case TRANSACTIONAL_SNAPSHOT: {
                             cache = new GridNearTransactionalCache(cacheCtx);
 
                             break;
@@ -1572,7 +1588,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 }
                 else {
                     switch (cfg.getAtomicityMode()) {
-                        case TRANSACTIONAL: {
+                        case TRANSACTIONAL:
+                        case TRANSACTIONAL_SNAPSHOT: {
                             cache = cacheCtx.affinityNode() ?
                                 new GridDhtColocatedCache(cacheCtx) :
                                 new GridDhtColocatedCache(cacheCtx, new 
GridNoStorageCacheMap());
@@ -1662,7 +1679,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             GridDhtCacheAdapter dht = null;
 
             switch (cfg.getAtomicityMode()) {
-                case TRANSACTIONAL: {
+                case TRANSACTIONAL:
+                case TRANSACTIONAL_SNAPSHOT: {
                     assert cache instanceof GridNearTransactionalCache;
 
                     GridNearTransactionalCache near = 
(GridNearTransactionalCache)cache;
@@ -3428,7 +3446,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     CacheConfiguration ccfg = req.startCacheConfiguration();
 
                     try {
-                        cachesInfo.validateStartCacheConfiguration(ccfg, 
req.cacheType());
+                        cachesInfo.validateStartCacheConfiguration(ccfg);
                     }
                     catch (IgniteCheckedException e) {
                         fut.onDone(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
index 024ef70..cd7560f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.F;
@@ -56,17 +55,17 @@ class MvccPreviousCoordinatorQueries {
 
     /**
      * @param nodeQueries Active queries map.
-     * @param discoCache Discovery data.
+     * @param nodes Cluster nodes.
      * @param mgr Discovery manager.
      */
-    void init(Map<UUID, GridLongList> nodeQueries, DiscoCache discoCache, 
GridDiscoveryManager mgr) {
+    void init(Map<UUID, GridLongList> nodeQueries, Collection<ClusterNode> 
nodes, GridDiscoveryManager mgr) {
         synchronized (this) {
             assert !initDone;
             assert waitNodes == null;
 
             waitNodes = new HashSet<>();
 
-            for (ClusterNode node : discoCache.allNodes()) {
+            for (ClusterNode node : nodes) {
                 if ((nodeQueries == null || 
!nodeQueries.containsKey(node.id())) &&
                     mgr.alive(node) &&
                     !F.contains(rcvd, node.id()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index bce61a1..a09468f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -23,10 +23,12 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
@@ -43,8 +45,10 @@ public interface MvccProcessor extends GridProcessor {
      * @param evtType Event type.
      * @param nodes Current nodes.
      * @param topVer Topology version.
+     * @param customMsg Message
      */
-    void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long 
topVer);
+    void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long 
topVer,
+        @Nullable DiscoveryCustomMessage customMsg);
 
     /**
      * Exchange start callback.
@@ -71,6 +75,11 @@ public interface MvccProcessor extends GridProcessor {
     void processClientActiveQueries(UUID nodeId, @Nullable GridLongList 
activeQueries);
 
     /**
+     * @return Mvcc coordinator received from discovery event.
+     */
+    @Nullable MvccCoordinator assignedCoordinator();
+
+    /**
      * @return Coordinator.
      */
     @Nullable MvccCoordinator currentCoordinator();
@@ -83,11 +92,6 @@ public interface MvccProcessor extends GridProcessor {
     @Nullable MvccCoordinator currentCoordinator(AffinityTopologyVersion 
topVer);
 
     /**
-     * @return Mvcc coordinator received from discovery event.
-     */
-    @Nullable MvccCoordinator coordinatorFromDiscoveryEvent();
-
-    /**
      * @return Current coordinator node ID.
      */
     UUID currentCoordinatorId();
@@ -247,4 +251,32 @@ public interface MvccProcessor extends GridProcessor {
      * @param diagCtx Diagnostic request.
      */
     void dumpDebugInfo(IgniteLogger log, @Nullable 
IgniteDiagnosticPrepareContext diagCtx);
+
+    /**
+     * @return {@code True} if at least one cache with
+     * {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered.
+     */
+    boolean mvccEnabled();
+
+    /**
+     * Pre-processes cache configuration before start.
+     *
+     * @param ccfg Cache configuration to pre-process.
+     */
+    void preProcessCacheConfiguration(CacheConfiguration ccfg);
+
+    /**
+     * Validates cache configuration before start.
+     *
+     * @param ccfg Cache configuration to validate.
+     * @throws IgniteCheckedException If validation failed.
+     */
+    void validateCacheConfiguration(CacheConfiguration ccfg) throws 
IgniteCheckedException;
+
+    /**
+     * Starts MVCC processor (i.e. initialises data structures and vacuum) if 
it has not been started yet.
+     *
+     * @throws IgniteCheckedException If failed to initialize.
+     */
+    void ensureStarted() throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 220f0c0..d1e2dc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -28,9 +28,13 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.expiry.EternalExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -40,15 +44,17 @@ import 
org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -92,13 +98,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
-import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -127,7 +134,10 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.CacheDataR
  * MVCC processor.
  */
 @SuppressWarnings("serial")
-class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, 
DatabaseLifecycleListener {
+public class MvccProcessorImpl extends GridProcessorAdapter implements 
MvccProcessor, DatabaseLifecycleListener {
+    /** */
+    private static final IgniteProductVersion MVCC_SUPPORTED_SINCE = 
IgniteProductVersion.fromString("2.7.0");
+
     /** */
     private static final Waiter LOCAL_TRANSACTION_MARKER = new 
LocalTransactionMarker();
 
@@ -153,6 +163,9 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     private volatile MvccCoordinator curCrd;
 
     /** */
+    private volatile MvccCoordinator assignedCrd;
+
+    /** */
     private TxLog txLog;
 
     /** */
@@ -170,9 +183,6 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     private volatile Throwable vacuumError;
 
     /** */
-    private MvccDiscoveryData discoData = new MvccDiscoveryData(null);
-
-    /** */
     private final GridAtomicLong futIdCntr = new GridAtomicLong(0);
 
     /** */
@@ -208,6 +218,12 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     /** */
     private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>();
 
+    /** Flag whether at least one cache with {@code 
CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered. */
+    private volatile boolean mvccEnabled;
+
+    /** Flag whether all nodes in cluster support MVCC. */
+    private volatile boolean mvccSupported = true;
+
     /**
      * @param ctx Context.
      */
@@ -219,8 +235,6 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_MVCC_ENABLED, 
Boolean.TRUE);
-
         ctx.event().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
@@ -228,12 +242,57 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteNodeValidationResult 
validateNode(ClusterNode node) {
-        Boolean rmtEnabled = 
node.attribute(IgniteNodeAttributes.ATTR_MVCC_ENABLED);
+    @Override public boolean mvccEnabled() {
+        return mvccEnabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void preProcessCacheConfiguration(CacheConfiguration 
ccfg) {
+        if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
+            if (!mvccSupported)
+                throw new IgniteException("Cannot start MVCC transactional 
cache. " +
+                    "MVCC is unsupported by the cluster.");
+
+            mvccEnabled = true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateCacheConfiguration(CacheConfiguration ccfg) 
throws IgniteCheckedException {
+        if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
+            if (!mvccSupported)
+                throw new IgniteException("Cannot start MVCC transactional 
cache. " +
+                    "MVCC is unsupported by the cluster.");
+
+            if (ccfg.getCacheStoreFactory() != null) {
+                throw new IgniteCheckedException("Transactional cache may not 
have a third party cache store when " +
+                    "MVCC is enabled.");
+            }
+
+            if (ccfg.getExpiryPolicyFactory() != null && 
!(ccfg.getExpiryPolicyFactory().create() instanceof
+                EternalExpiryPolicy)) {
+                throw new IgniteCheckedException("Transactional cache may not 
have expiry policy when " +
+                    "MVCC is enabled.");
+            }
+
+            if (ccfg.getInterceptor() != null) {
+                throw new IgniteCheckedException("Transactional cache may not 
have an interceptor when " +
+                    "MVCC is enabled.");
+            }
+
+            if (ccfg.getCacheMode() == CacheMode.LOCAL)
+                throw new IgniteCheckedException("Local caches are not 
supported by MVCC engine. Use " +
+                    "CacheAtomicityMode.TRANSACTIONAL for local caches.");
+
+            mvccEnabled = true;
+        }
+    }
 
-        if (rmtEnabled == null) {
-            String errMsg = "Failed to add node to topology because MVCC is 
enabled on cluster and " +
-                "the node doesn't support MVCC or MVCC is disabled for the 
node [nodeId=" + node.id() + ']';
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteNodeValidationResult 
validateNode(ClusterNode node) {
+        if (mvccEnabled && 
node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) < 0) {
+            String errMsg = "Failed to add node to topology. MVCC is enabled 
on the cluster, but " +
+                "the node doesn't support MVCC [nodeId=" + node.id() + ']';
 
             return new IgniteNodeValidationResult(node.id(), errMsg, errMsg);
         }
@@ -242,6 +301,19 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     }
 
     /** {@inheritDoc} */
+    @Override public void ensureStarted() throws IgniteCheckedException {
+        if (!ctx.clientNode() && txLog == null) {
+            assert mvccEnabled && mvccSupported;
+
+            txLog = new TxLog(ctx, ctx.cache().context().database());
+
+            startVacuumWorkers();
+
+            log.info("Mvcc processor started.");
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void beforeStop(IgniteCacheDatabaseSharedManager mgr) {
         stopVacuumWorkers();
 
@@ -250,6 +322,7 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public void onInitDataRegions(IgniteCacheDatabaseSharedManager 
mgr) throws IgniteCheckedException {
+        // We have to always init txLog data region.
         DataStorageConfiguration dscfg = dataStorageConfiguration();
 
         mgr.addDataRegion(
@@ -260,13 +333,7 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public void afterInitialise(IgniteCacheDatabaseSharedManager 
mgr) throws IgniteCheckedException {
-        if (!CU.isPersistenceEnabled(ctx.config())) {
-            assert txLog == null;
-
-            txLog = new TxLog(ctx, mgr);
-
-            startVacuumWorkers();
-        }
+        // No-op.
     }
 
     /** {@inheritDoc} */
@@ -281,87 +348,19 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public void afterMemoryRestore(IgniteCacheDatabaseSharedManager 
mgr) throws IgniteCheckedException {
-        assert CU.isPersistenceEnabled(ctx.config());
-        assert txLog == null;
-
-        txLog = new TxLog(ctx, mgr);
-
-        startVacuumWorkers();
-    }
-
-    /** {@inheritDoc} */
-    @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CACHE_CRD_PROC;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        Integer cmpId = discoveryDataType().ordinal();
-
-        if (!dataBag.commonDataCollectedFor(cmpId))
-            dataBag.addGridCommonData(cmpId, discoData);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
-        MvccDiscoveryData discoData0 = (MvccDiscoveryData)data.commonData();
-
-        // Disco data might be null in case the first joined node is daemon.
-        if (discoData0 != null) {
-            discoData = discoData0;
-
-            log.info("Received mvcc coordinator on node join: " + 
discoData.coordinator());
-
-            assert discoData != null;
-        }
+        // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public void onDiscoveryEvent(int evtType, 
Collection<ClusterNode> nodes, long topVer) {
-        if (evtType == EVT_NODE_METRICS_UPDATED || evtType == 
EVT_DISCOVERY_CUSTOM_EVT)
+    @Override public void onDiscoveryEvent(int evtType, 
Collection<ClusterNode> nodes, long topVer,
+        @Nullable DiscoveryCustomMessage customMsg) {
+        if (evtType == EVT_NODE_METRICS_UPDATED)
             return;
 
-        MvccCoordinator crd;
-
-        if (evtType == EVT_NODE_SEGMENTED || evtType == 
EVT_CLIENT_NODE_DISCONNECTED)
-            crd = null;
-        else {
-            crd = discoData.coordinator();
-
-            if (crd == null ||
-                ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
-                ClusterNode crdNode = null;
-
-                if (crdC != null) {
-                    crdNode = crdC.apply(nodes);
-
-                    if (log.isInfoEnabled())
-                        log.info("Assigned coordinator using test closure: " + 
crd);
-                }
-                else {
-                    // Expect nodes are sorted by order.
-                    for (ClusterNode node : nodes) {
-                        if (!node.isClient()) {
-                            crdNode = node;
-
-                            break;
-                        }
-                    }
-                }
-
-                crd = crdNode != null ? new MvccCoordinator(crdNode.id(), 
coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null;
-
-                if (crd != null) {
-                    if (log.isInfoEnabled())
-                        log.info("Assigned mvcc coordinator [crd=" + crd + ", 
crdNode=" + crdNode + ']');
-                }
-                else
-                    U.warn(log, "New mvcc coordinator was not assigned 
[topVer=" + topVer + ']');
-            }
-        }
-
-        discoData = new MvccDiscoveryData(crd);
+        if (evtType == EVT_DISCOVERY_CUSTOM_EVT)
+            checkMvccCacheStarted(customMsg);
+        else
+            assignMvccCoordinator(evtType, nodes, topVer);
     }
 
     /** {@inheritDoc} */
@@ -384,8 +383,8 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(boolean newCoord, DiscoCache 
discoCache, Map<UUID, GridLongList> activeQueries) {
-        if (!newCoord)
+    @Override public void onExchangeDone(boolean newCrd, DiscoCache 
discoCache, Map<UUID, GridLongList> activeQueries) {
+        if (!newCrd)
             return;
 
         ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
@@ -406,7 +405,7 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
             log.info("Initialize local node as mvcc coordinator [node=" + 
ctx.localNodeId() +
                 ", crdVer=" + crdVer + ']');
 
-            prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
+            prevCrdQueries.init(activeQueries, F.view(discoCache.allNodes(), 
this::supportsMvcc), ctx.discovery());
 
             initFut.onDone();
         }
@@ -435,8 +434,8 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public MvccCoordinator coordinatorFromDiscoveryEvent() 
{
-        return discoData != null ? discoData.coordinator() : null;
+    @Override @Nullable public MvccCoordinator assignedCoordinator() {
+        return assignedCrd;
     }
 
     /** {@inheritDoc} */
@@ -458,6 +457,8 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public byte state(MvccVersion ver) throws IgniteCheckedException 
{
+        assert txLog != null && mvccEnabled;
+
         return txLog.get(ver.coordinatorVersion(), ver.counter());
     }
 
@@ -468,6 +469,8 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
     /** {@inheritDoc} */
     @Override public void updateState(MvccVersion ver, byte state, boolean 
primary) throws IgniteCheckedException {
+        assert txLog != null && mvccEnabled;
+
         TxKey key = new TxKey(ver.coordinatorVersion(), ver.counter());
 
         txLog.put(key, state, primary);
@@ -808,12 +811,104 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
         return ctx.config().getDataStorageConfiguration();
     }
 
+    /** */
+    private void assignMvccCoordinator(int evtType, Collection<ClusterNode> 
nodes, long topVer) {
+        checkMvccSupported(nodes);
+
+        MvccCoordinator crd;
+
+        if (evtType == EVT_NODE_SEGMENTED || evtType == 
EVT_CLIENT_NODE_DISCONNECTED)
+            crd = null;
+        else {
+            crd = assignedCrd;
+
+            if (crd == null ||
+                ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
+                ClusterNode crdNode = null;
+
+                if (crdC != null) {
+                    crdNode = crdC.apply(nodes);
+
+                    if (log.isInfoEnabled())
+                        log.info("Assigned coordinator using test closure: " + 
crd);
+                }
+                else {
+                    // Expect nodes are sorted by order.
+                    for (ClusterNode node : nodes) {
+                        if (!node.isClient() && supportsMvcc(node)) {
+                            crdNode = node;
+
+                            break;
+                        }
+                    }
+                }
+
+                crd = crdNode != null ? new MvccCoordinator(crdNode.id(), 
coordinatorVersion(crdNode),
+                    new AffinityTopologyVersion(topVer, 0)) : null;
+
+                if (log.isInfoEnabled() && crd != null)
+                    log.info("Assigned mvcc coordinator [crd=" + crd + ", 
crdNode=" + crdNode + ']');
+                else if (crd == null)
+                    U.warn(log, "New mvcc coordinator was not assigned 
[topVer=" + topVer + ']');
+            }
+        }
+
+        assignedCrd = crd;
+    }
+
     /**
-     * @param topVer Topology version.
+     * @param crdNode Assigned coordinator node.
      * @return Coordinator version.
      */
-    private long coordinatorVersion(long topVer) {
-        return topVer + ctx.discovery().gridStartTime();
+    private long coordinatorVersion(ClusterNode crdNode) {
+        return crdNode.order() + ctx.discovery().gridStartTime();
+    }
+
+    /** */
+    private void checkMvccSupported(Collection<ClusterNode> nodes) {
+        if (mvccEnabled) {
+            assert mvccSupported;
+
+            return;
+        }
+
+        boolean res = true, was = mvccSupported;
+
+        for (ClusterNode node : nodes) {
+            if (!supportsMvcc(node)) {
+                res = false;
+
+                break;
+            }
+        }
+
+        if (was != res)
+            mvccSupported = res;
+    }
+
+    /** */
+    private boolean supportsMvcc(ClusterNode node) {
+        return node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) 
>= 0;
+    }
+
+    /** */
+    private void checkMvccCacheStarted(@Nullable DiscoveryCustomMessage 
customMsg) {
+        assert customMsg != null;
+
+        if (!mvccEnabled && customMsg instanceof DynamicCacheChangeBatch) {
+            for (DynamicCacheChangeRequest req : 
((DynamicCacheChangeBatch)customMsg).requests()) {
+                CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                if (ccfg == null)
+                    continue;
+
+                if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
+                    assert mvccSupported;
+
+                    mvccEnabled = true;
+                }
+            }
+        }
     }
 
     /**
@@ -952,11 +1047,11 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
 
                     cleanupQueue = new LinkedBlockingQueue<>();
 
-                    vacuumWorkers = new 
ArrayList<>(ctx.config().getMvccVacuumThreadCnt() + 1);
+                    vacuumWorkers = new 
ArrayList<>(ctx.config().getMvccVacuumThreadCount() + 1);
 
                     vacuumWorkers.add(new VacuumScheduler(ctx, log, this));
 
-                    for (int i = 0; i < ctx.config().getMvccVacuumThreadCnt(); 
i++) {
+                    for (int i = 0; i < 
ctx.config().getMvccVacuumThreadCount(); i++) {
                         vacuumWorkers.add(new VacuumWorker(ctx, log, 
cleanupQueue));
                     }
 
@@ -1895,7 +1990,7 @@ class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProcessor, D
         VacuumScheduler(GridKernalContext ctx, IgniteLogger log, 
MvccProcessorImpl prc) {
             super(ctx.igniteInstanceName(), "vacuum-scheduler", log);
 
-            this.interval = ctx.config().getMvccVacuumTimeInterval();
+            this.interval = ctx.config().getMvccVacuumFrequency();
             this.prc = prc;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index c75393e..0422459 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -102,14 +102,6 @@ public class MvccUtils {
     }
 
     /**
-     * @param ctx Kernal context.
-     * @return Newly created Mvcc processor.
-     */
-    public static MvccProcessor createProcessor(GridKernalContext ctx) {
-        return mvccEnabled(ctx) ? new MvccProcessorImpl(ctx) : new 
NoOpMvccProcessor(ctx);
-    }
-
-    /**
      * @param cctx Cache context.
      * @param mvccCrd Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
@@ -720,10 +712,10 @@ public class MvccUtils {
 
     /**
      * @param ctx Grid kernal context.
-     * @return Whether MVCC is enabled or not on {@link IgniteConfiguration}.
+     * @return Whether MVCC is enabled or not.
      */
     public static boolean mvccEnabled(GridKernalContext ctx) {
-        return ctx.config().isMvccEnabled();
+        return ctx.coordinators().mvccEnabled();
     }
 
     /**
@@ -794,6 +786,19 @@ public class MvccUtils {
         return snapshot;
     }
 
+    /**
+     * Throws atomicity modes compatibility validation exception.
+     *
+     * @param ctx1 Cache context.
+     * @param ctx2 Another cache context.
+     */
+    public static void throwAtomicityModesMismatchException(GridCacheContext 
ctx1, GridCacheContext ctx2) {
+        throw new IgniteException("Caches with transactional_snapshot 
atomicity mode cannot participate in the same" +
+            " transaction with caches having another atomicity mode. 
[cacheName=" + ctx1.name() +
+            ", cacheMode=" + ctx1.config().getAtomicityMode() +
+            ", anotherCacheName=" + ctx2.name() + " anotherCacheMode=" + 
ctx2.config().getAtomicityMode() + ']');
+    }
+
     /** */
     private static MvccVersion mvccVersion(long crd, long cntr, int opCntr) {
         return new MvccVersionImpl(crd, cntr, opCntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java
deleted file mode 100644
index b9a5132..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.ExchangeContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.GridLongList;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class NoOpMvccProcessor extends GridProcessorAdapter implements MvccProcessor {
-    /**
-     * @param ctx Kernal context.
-     */
-    protected NoOpMvccProcessor(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDiscoveryEvent(int evtType, 
Collection<ClusterNode> nodes, long topVer) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onExchangeStart(MvccCoordinator mvccCrd, 
ExchangeContext exchCtx, ClusterNode exchCrd) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onExchangeDone(boolean newCoord, DiscoCache 
discoCache,
-        Map<UUID, GridLongList> activeQueries) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void processClientActiveQueries(UUID nodeId, @Nullable 
GridLongList activeQueries) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public MvccCoordinator currentCoordinator() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public MvccCoordinator 
currentCoordinator(AffinityTopologyVersion topVer) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public MvccCoordinator coordinatorFromDiscoveryEvent() 
{
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID currentCoordinatorId() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateCoordinator(MvccCoordinator curCrd) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte state(long crdVer, long cntr) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte state(MvccVersion ver) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateState(MvccVersion ver, byte state) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateState(MvccVersion ver, byte state, boolean 
primary) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void registerLocalTransaction(long crd, long cntr) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasLocalTransaction(long crd, long cntr) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx,
-        MvccVersion locked) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addQueryTracker(MvccQueryTracker tracker) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeQueryTracker(Long id) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public MvccSnapshot tryRequestSnapshotLocal() {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public MvccSnapshot tryRequestSnapshotLocal(
-        @Nullable IgniteInternalTx tx) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() 
{
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<MvccSnapshot> 
requestSnapshotAsync(IgniteInternalTx tx) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void requestSnapshotAsync(MvccSnapshotResponseListener 
lsnr) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void requestSnapshotAsync(IgniteInternalTx tx, 
MvccSnapshotResponseListener lsnr) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot 
updateVer) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion 
updateVer, MvccSnapshot readSnapshot,
-        long qryId) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void ackTxRollback(MvccVersion updateVer) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-   @Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot 
readSnapshot, long qryTrackerId) {
-       throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, 
GridLongList txs) {
-        throw processorException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void dumpDebugInfo(IgniteLogger log, @Nullable 
IgniteDiagnosticPrepareContext diagCtx) {
-        // No-op.
-    }
-
-    /**
-     * @return No-op processor usage exception;
-     */
-    private IgniteException processorException() {
-        return new IgniteException("Current Ignite configuration does not 
support MVCC functionality " +
-            "(consider adding ignite-schedule module to classpath).");
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0470fad..ab60f47 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -497,7 +497,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         final boolean includeExpired) throws IgniteCheckedException
     {
         //TODO IGNITE-7953
-        if (!cctx.atomic() && cctx.kernalContext().config().isMvccEnabled())
+        if (cctx.transactionalSnapshot())
             throw new UnsupportedOperationException("Continuous queries are 
not supported for transactional caches " +
                 "when MVCC is enabled.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab94934/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 7dda67f..97ce20a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.processors.authentication.AuthorizationContext
 import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import 
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
@@ -190,10 +191,8 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
         log = ctx.log(getClass());
 
-        if (ctx.grid().configuration().isMvccEnabled())
-            worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), 
log, this, ctx);
-        else
-            worker = null;
+        // TODO IGNITE-9484 Do not create worker if there is a possibility to 
unbind TX from threads.
+        worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, 
this, ctx);
     }
 
     /** {@inheritDoc} */
@@ -204,7 +203,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
         JdbcRequest req = (JdbcRequest)req0;
 
-        if (worker == null)
+        if (!MvccUtils.mvccEnabled(ctx))
             return doHandle(req);
         else {
             GridFutureAdapter<ClientListenerResponse> fut = 
worker.process(req);

Reply via email to