Merge remote-tracking branch 'remotes/origin/master' into ignite-2.1.2 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c37c6e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c37c6e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c37c6e Branch: refs/heads/ignite-2.1.2-exchange Commit: 96c37c6e22d72417df63db0aeac2b6ccd8eb6eb7 Parents: 28d72d8 807cab2 Author: sboikov <[email protected]> Authored: Mon Jun 19 20:21:33 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jun 19 20:21:33 2017 +0300 ---------------------------------------------------------------------- .../cassandra/common/PropertyMappingHelper.java | 73 +++----- .../persistence/KeyPersistenceSettings.java | 155 +++++----------- .../persistence/PersistenceSettings.java | 81 +++++++- .../store/cassandra/persistence/PojoField.java | 129 +++++++------ .../persistence/PojoFieldAccessor.java | 162 ++++++++++++++++ .../cassandra/persistence/PojoKeyField.java | 13 +- .../cassandra/persistence/PojoValueField.java | 12 +- .../persistence/ValuePersistenceSettings.java | 50 +---- .../store/cassandra/utils/DDLGenerator.java | 9 +- .../tests/CassandraDirectPersistenceTest.java | 81 +++++++- .../apache/ignite/tests/DDLGeneratorTest.java | 4 + .../ignite/tests/IgnitePersistentStoreTest.java | 91 ++++++++- .../apache/ignite/tests/pojos/SimplePerson.java | 186 +++++++++++++++++++ .../ignite/tests/pojos/SimplePersonId.java | 89 +++++++++ .../apache/ignite/tests/utils/TestsHelper.java | 103 +++++++++- .../tests/persistence/pojo/ignite-config.xml | 36 ++++ .../persistence/pojo/persistence-settings-5.xml | 21 +++ .../persistence/pojo/persistence-settings-6.xml | 174 +++++++++++++++++ .../apache/ignite/IgniteSystemProperties.java | 3 + .../GridCachePartitionExchangeManager.java | 12 +- .../dht/GridClientPartitionTopology.java | 20 +- .../dht/GridDhtPartitionTopology.java | 8 +- .../dht/GridDhtPartitionTopologyImpl.java | 22 +-- ...chePartitionExchangeManagerHistSizeTest.java | 76 ++++++++ .../distributed/IgniteCacheGetRestartTest.java | 4 +- .../testsuites/IgniteCacheTestSuite5.java | 3 + .../rocketmq/RocketMQStreamerTestSuite.java | 2 +- parent/pom.xml | 2 +- 28 files changed, 1296 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fe1fd14,dea690e..f4c7fce --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -1278,7 -1272,7 +1278,7 @@@ public class GridCachePartitionExchange top = grp.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)) != null; - updated |= top.update(null, entry.getValue(), null); ++ updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)); } if (!cctx.kernalContext().clientNode() && updated) http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index a17df36,9c769a9..2b16a98 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -573,12 -562,10 +573,12 @@@ public class GridClientPartitionTopolog /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update( + @Override public boolean update( @Nullable AffinityTopologyVersion exchVer, GridDhtPartitionFullMap partMap, - Map<Integer, T2<Long, Long>> cntrMap) { + Map<Integer, T2<Long, Long>> cntrMap, + Set<Integer> partsToReload + ) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']'); @@@ -688,23 -674,9 +688,23 @@@ } } + /** + * Method checks is new partition map more stale than current partition map + * New partition map is stale if topology version or update sequence are less than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if new partition map is more stale than current partition map, false in other case + */ + private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return currentMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) < 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() <= currentMap.updateSequence()); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update( + @Override public boolean update( @Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap parts ) { @@@ -742,12 -714,12 +742,12 @@@ GridDhtPartitionMap cur = node2part.get(parts.nodeId()); - if (cur != null && cur.updateSequence() >= parts.updateSequence()) { + if (isStaleUpdate(cur, parts)) { if (log.isDebugEnabled()) - log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + - ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); + log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId + + ", curMap=" + cur + ", newMap=" + parts + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 61ebbb1,3c38942..bf4e844 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -237,13 -225,12 +237,13 @@@ public interface GridDhtPartitionTopolo * @param exchangeVer Exchange version. * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @return {@code True} if local state was changed. */ - public GridDhtPartitionMap update( + public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap); + @Nullable Map<Integer, T2<Long, Long>> cntrMap, + Set<Integer> partsToReload); /** * @param exchId Exchange ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1620633,f3170fa..ce19c6b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -1043,27 -1108,12 +1043,27 @@@ public class GridDhtPartitionTopologyIm } } + /** + * Checks should current partition map overwritten by new partition map + * Method returns true if topology version or update sequence of new map are greater than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if current partition map should be overwritten by new partition map, false in other case + */ + private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return newMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Override public GridDhtPartitionMap update( + @Override public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap + @Nullable Map<Integer, T2<Long, Long>> cntrMap, + Set<Integer> partsToReload ) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@@ -1104,17 -1154,19 +1104,17 @@@ log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" + lastExchangeVer + ", exch=" + exchangeVer + ']'); - return null; + return false; } - if (node2part != null && node2part.compareTo(partMap) >= 0) { + if (node2part != null && node2part.compareTo(partMap) > 0) { if (log.isDebugEnabled()) log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return null; + return false; } - long updateSeq = this.updateSeq.incrementAndGet(); - if (exchangeVer != null) lastExchangeVer = exchangeVer; @@@ -1320,23 -1311,9 +1320,23 @@@ } } + /** + * Method checks is new partition map more stale than current partition map + * New partition map is stale if topology version or update sequence are less than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if new partition map is more stale than current partition map, false in other case + */ + private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return currentMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) < 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() <= currentMap.updateSequence()); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update( + @Override public boolean update( @Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap parts ) { @@@ -1374,12 -1351,12 +1374,12 @@@ GridDhtPartitionMap cur = node2part.get(parts.nodeId()); - if (cur != null && cur.updateSequence() >= parts.updateSequence()) { + if (isStaleUpdate(cur, parts)) { if (log.isDebugEnabled()) - log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + - ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); + log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId + + ", curMap=" + cur + ", newMap=" + parts + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96c37c6e/parent/pom.xml ----------------------------------------------------------------------
