# Merged 6.6.3 fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a86ae903 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a86ae903 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a86ae903 Branch: refs/heads/ignite-160 Commit: a86ae903e337140ddd18e966921e0de9d70ae79f Parents: 46160c9 Author: vozerov-gridgain <[email protected]> Authored: Thu Feb 5 16:58:14 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Feb 5 16:58:14 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 22 +++---- .../affinity/GridAffinityAssignmentCache.java | 23 ++++++- .../processors/cache/GridCacheMapEntry.java | 10 ++- .../GridCacheContinuousQueryAdapter.java | 45 +++++++++----- .../GridCacheContinuousQueryHandler.java | 15 ++++- .../continuous/GridContinuousProcessor.java | 15 ++++- .../portable/GridPortableInputStream.java | 7 +++ ...dCacheContinuousQueryReplicatedSelfTest.java | 65 ++++++++++++++++++++ .../hadoop/jobtracker/GridHadoopJobTracker.java | 2 +- 9 files changed, 160 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 217a2aa..b612465e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1507,15 +1507,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private void onSegmentation() { GridSegmentationPolicy segPlc = ctx.config().getSegmentationPolicy(); + // Always disconnect first. + try { + getSpi().disconnect(); + } + catch (IgniteSpiException e) { + U.error(log, "Failed to disconnect discovery SPI.", e); + } + switch (segPlc) { case RESTART_JVM: - try { - getSpi().disconnect(); - } - catch (IgniteSpiException e) { - U.error(log, "Failed to disconnect discovery SPI.", e); - } - U.warn(log, "Restarting JVM according to configured segmentation policy."); restartJvm(); @@ -1523,13 +1524,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; case STOP: - try { - getSpi().disconnect(); - } - catch (IgniteSpiException e) { - U.error(log, "Failed to disconnect discovery SPI.", e); - } - U.warn(log, "Stopping local node according to configured segmentation policy."); stopNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index cc447ea..42c3b5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.affinity; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; @@ -35,6 +36,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; + /** * Affinity cached function. */ @@ -121,6 +124,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version to calculate affinity cache for. * @param discoEvt Discovery event that caused this topology version change. */ + @SuppressWarnings("IfMayBeConditional") public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent discoEvt) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + @@ -142,8 +146,23 @@ public class GridAffinityAssignmentCache { List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment(); - List<List<ClusterNode>> assignment = aff.assignPartitions( - new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups)); + List<List<ClusterNode>> assignment; + + if (prevAssignment != null && discoEvt != null) { + CacheDistributionMode distroMode = U.distributionMode(discoEvt.eventNode(), ctx.name()); + + if (distroMode == null || // no cache on node. + distroMode == CLIENT_ONLY || distroMode == NEAR_ONLY) + assignment = prevAssignment; + else + assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, + discoEvt, topVer, backups)); + } + else + assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, + topVer, backups)); + + assert assignment != null; GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 6aafc5d..96ceb93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1167,8 +1167,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> CacheMode mode = cctx.config().getCacheMode(); - if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || - (tx != null && tx.local() && !isNear())) + if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, false); @@ -1329,8 +1328,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> CacheMode mode = cctx.config().getCacheMode(); - if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || - (tx != null && tx.local() && !isNear())) + if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, true); @@ -2144,7 +2142,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - if (primary || cctx.isReplicated()) + if (primary) cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3143,7 +3141,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) { + if (cctx.affinity().primary(cctx.localNode(), key, topVer)) { cctx.continuousQueries().onEntryUpdate(this, key, val, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index ba34d6b..acd96ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -35,7 +35,7 @@ import javax.cache.event.*; import java.util.*; import java.util.concurrent.locks.*; -import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; /** * Continuous query implementation. @@ -228,27 +228,39 @@ public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQue prj = prj.forCacheNodes(ctx.name()); if (prj.nodes().isEmpty()) - throw new ClusterTopologyCheckedException("Failed to execute query (projection is empty): " + this); + throw new ClusterTopologyCheckedException("Failed to continuous execute query (projection is empty): " + + this); - CacheMode mode = ctx.config().getCacheMode(); + boolean skipPrimaryCheck = false; - if (mode == LOCAL || mode == REPLICATED) { - Collection<ClusterNode> nodes = prj.nodes(); + Collection<ClusterNode> nodes = prj.nodes(); - ClusterNode node = nodes.contains(ctx.localNode()) ? ctx.localNode() : F.rand(nodes); + if (nodes.isEmpty()) + throw new ClusterTopologyCheckedException("Failed to execute continuous query (empty projection is " + + "provided): " + this); - assert node != null; + switch (ctx.config().getCacheMode()) { + case LOCAL: + if (!nodes.contains(ctx.localNode())) + throw new ClusterTopologyCheckedException("Continuous query for LOCAL cache can be executed " + + "only locally (provided projection contains remote nodes only): " + this); + else if (nodes.size() > 1) + U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + + "ignored): " + this); - if (nodes.size() > 1) { - if (node.id().equals(ctx.localNodeId())) - U.warn(log, "Continuous query for " + mode + " cache can be run only on local node. " + - "Will execute query locally: " + this); - else - U.warn(log, "Continuous query for " + mode + " cache can be run only on single node. " + - "Will execute query on remote node [qry=" + this + ", node=" + node + ']'); - } + prj = prj.forNode(ctx.localNode()); - prj = prj.forNode(node); + break; + + case REPLICATED: + if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) { + CacheDistributionMode distributionMode = ctx.config().getDistributionMode(); + + if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) + skipPrimaryCheck = true; + } + + break; } closeLock.lock(); @@ -271,6 +283,7 @@ public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQue entryLsnr, sync, oldVal, + skipPrimaryCheck, taskNameHash, keepPortable); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index 350b9b8..a03b9db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -84,6 +84,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Keep portable flag. */ private boolean keepPortable; + /** Whether to skip primary check for REPLICATED cache. */ + private transient boolean skipPrimaryCheck; + /** * Required by {@link Externalizable}. */ @@ -103,6 +106,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. * @param oldVal {@code True} if old value is required. + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. */ GridCacheContinuousQueryHandler(@Nullable String cacheName, @@ -114,6 +118,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { boolean entryLsnr, boolean sync, boolean oldVal, + boolean skipPrimaryCheck, int taskHash, boolean keepPortable) { assert topic != null; @@ -131,6 +136,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.oldVal = oldVal; this.taskHash = taskHash; this.keepPortable = keepPortable; + this.skipPrimaryCheck = skipPrimaryCheck; } /** {@inheritDoc} */ @@ -184,16 +190,21 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary()) + return; + boolean notify; - CacheFlag[] f = cacheContext(ctx).forceLocalRead(); + CacheFlag[] f = cctx.forceLocalRead(); try { notify = (prjPred == null || checkProjection(e)) && (filter == null || filter.apply(e)); } finally { - cacheContext(ctx).forceFlags(f); + cctx.forceFlags(f); } if (notify) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 93df61f..bc66a2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1276,7 +1276,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @return Object to send or {@code null} if there is nothing to send for now. */ @Nullable Collection<Object> add(@Nullable Object obj) { - Collection<Object> toSnd = null; + ConcurrentLinkedDeque8 buf0 = null; if (buf.sizex() >= bufSize - 1) { lock.writeLock().lock(); @@ -1284,7 +1284,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { buf.add(obj); - toSnd = buf; + buf0 = buf; buf = new ConcurrentLinkedDeque8<>(); @@ -1306,7 +1306,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } - return toSnd != null ? new ArrayList<>(toSnd) : null; + Collection<Object> toSnd = null; + + if (buf0 != null) { + toSnd = new ArrayList<>(buf0.sizex()); + + for (Object o : buf0) + toSnd.add(o); + } + + return toSnd; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java index 8ab16f2..3c676bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java @@ -174,4 +174,11 @@ public interface GridPortableInputStream extends GridPortableStream { * @return Remaining data. */ public int remaining(); + + /** + * Length of data inside array. + * + * @param len Length. + */ + public void length(int len); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java index 39dfda0..dddda8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.*; @@ -45,6 +46,7 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") public void testRemoteNodeCallback() throws Exception { GridCache<Integer, Integer> cache1 = grid(0).cache(null); @@ -79,4 +81,67 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu assertEquals(10, val.get().intValue()); } + + /** + * Ensure that every node see every update. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testCrossCallback() throws Exception { + // Prepare. + GridCache<Integer, Integer> cache1 = grid(0).cache(null); + GridCache<Integer, Integer> cache2 = grid(1).cache(null); + + final int key1 = primaryKey(cache1); + final int key2 = primaryKey(cache2); + + final CountDownLatch latch1 = new CountDownLatch(2); + final CountDownLatch latch2 = new CountDownLatch(2); + + + // Start query on the first node. + CacheContinuousQuery<Integer, Integer> qry1 = cache1.queries().createContinuousQuery(); + + qry1.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { + @Override public boolean apply(UUID nodeID, + Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { + for (CacheContinuousQueryEntry entry : entries) { + log.info("Update in cache 1: " + entry); + + if (entry.getKey() == key1 || entry.getKey() == key2) + latch1.countDown(); + } + + return latch1.getCount() != 0; + } + }); + + qry1.execute(); + + // Start query on the second node. + CacheContinuousQuery<Integer, Integer> qry2 = cache2.queries().createContinuousQuery(); + + qry2.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { + @Override public boolean apply(UUID nodeID, + Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { + for (CacheContinuousQueryEntry entry : entries) { + log.info("Update in cache 2: " + entry); + + if (entry.getKey() == key1 || entry.getKey() == key2) + latch2.countDown(); + } + + return latch2.getCount() != 0; + } + }); + + qry2.execute(); + + cache1.put(key1, key1); + cache1.put(key2, key2); + + assert latch1.await(LATCH_TIMEOUT, MILLISECONDS); + assert latch2.await(LATCH_TIMEOUT, MILLISECONDS); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index 1b9b4cb..2a771b8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -194,7 +194,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } }); - qry.execute(); + qry.execute(ctx.kernalContext().grid().forLocal()); ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(final IgniteEvent evt) {
