Compatibility fix for CacheContinuousQueryBatchAck message. (cherry picked from commit faa77e2)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aad672b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aad672b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aad672b5 Branch: refs/heads/ignite-2801 Commit: aad672b591d87cdddd04774bf4787468398feb1b Parents: fbff90e Author: sboikov <[email protected]> Authored: Wed Feb 24 19:32:57 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Feb 26 15:23:19 2016 +0300 ---------------------------------------------------------------------- .../query/continuous/CacheContinuousQueryBatchAck.java | 4 ++++ .../query/continuous/CacheContinuousQueryHandler.java | 10 +++++----- .../processors/continuous/GridContinuousProcessor.java | 1 + 3 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aad672b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java index 7db9026..26e2b05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -33,6 +34,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class CacheContinuousQueryBatchAck extends GridCacheMessage { /** */ + public static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.0"); + + /** */ private static final long serialVersionUID = 0L; /** Routine ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/aad672b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index fc14e82..4397f69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -152,7 +152,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private AffinityTopologyVersion initTopVer; /** */ - private transient boolean ignoreClassNotFound; + private transient boolean ignoreClsNotFound; /** * Required by {@link Externalizable}. @@ -192,7 +192,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean skipPrimaryCheck, boolean locCache, boolean keepBinary, - boolean ignoreClassNotFound) { + boolean ignoreClsNotFound) { assert topic != null; assert locLsnr != null; @@ -209,7 +209,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler this.skipPrimaryCheck = skipPrimaryCheck; this.locCache = locCache; this.keepBinary = keepBinary; - this.ignoreClassNotFound = ignoreClassNotFound; + this.ignoreClsNotFound = ignoreClsNotFound; cacheId = CU.cacheId(cacheName); } @@ -593,7 +593,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler entries0.addAll(handleEvent(ctx, e)); } catch (IgniteCheckedException ex) { - if (ignoreClassNotFound) + if (ignoreClsNotFound) assert internal; else U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); @@ -997,7 +997,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)); for (ClusterNode node : nodes) { - if (!node.isLocal()) { + if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) { try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aad672b5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 441d795..1ec69c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment;
