Fixing keepBinary for continuous queries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01c24e7d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01c24e7d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01c24e7d Branch: refs/heads/ignite-1956 Commit: 01c24e7d07e15df8a9a4722c0ec2a9366cb2f669 Parents: eeb9142 Author: Alexey Goncharuk <[email protected]> Authored: Mon Nov 23 21:40:52 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Nov 23 21:40:52 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 1 + .../internal/GridEventConsumeHandler.java | 5 ++ .../internal/GridMessageListenHandler.java | 5 ++ .../processors/cache/IgniteCacheProxy.java | 14 +++-- .../continuous/CacheContinuousQueryEntry.java | 50 ++++++++++++----- .../continuous/CacheContinuousQueryEvent.java | 6 +-- .../continuous/CacheContinuousQueryHandler.java | 33 +++++++++--- .../CacheContinuousQueryListener.java | 5 ++ .../continuous/CacheContinuousQueryManager.java | 57 ++++++++++++-------- .../continuous/GridContinuousHandler.java | 5 ++ .../continuous/GridContinuousProcessor.java | 10 +++- .../StartRoutineDiscoveryMessage.java | 13 ++++- 12 files changed, 156 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 5a31415..8733bb3 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 3918976..f4bbd6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -125,6 +125,11 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return false; + } + + /** {@inheritDoc} */ @Override public String cacheName() { throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index aa837b8..9a2829b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -98,6 +98,11 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return false; + } + + /** {@inheritDoc} */ @Override public String cacheName() { throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index cb36432..2a52a1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -555,7 +555,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Initial iteration cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc) { + private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) { if (qry.getInitialQuery() instanceof ContinuousQuery) throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + "continuous query. Use SCAN or SQL query for initial iteration."); @@ -570,7 +570,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry.getPageSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), - loc); + loc, + keepBinary); final QueryCursor<Cache.Entry<K, V>> cur = qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; @@ -616,8 +617,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V validate(qry); + CacheOperationContext opCtxCall = ctx.operationContextPerCall(); + if (qry instanceof ContinuousQuery) - return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal()); + return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), + opCtxCall != null && opCtxCall.isKeepBinary()); if (qry instanceof SqlQuery) { final SqlQuery p = (SqlQuery)qry; @@ -1623,7 +1627,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheOperationContext prev = onEnter(gate, opCtx); try { - ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null && opCtx.isKeepBinary()); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 0495e6d..4d3786a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -99,6 +99,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** Filtered events. */ private GridLongList filteredEvts; + /** Keep binary. */ + private boolean keepBinary; + /** * Required by {@link Message}. */ @@ -122,6 +125,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { KeyCacheObject key, @Nullable CacheObject newVal, @Nullable CacheObject oldVal, + boolean keepBinary, int part, long updateCntr, @Nullable AffinityTopologyVersion topVer) { @@ -133,6 +137,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { this.part = part; this.updateCntr = updateCntr; this.topVer = topVer; + this.keepBinary = keepBinary; } /** @@ -203,6 +208,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Keep binary flag. + */ + boolean isKeepBinary() { + return keepBinary; + } + + /** * @param cntrs Filtered events. */ void filteredEvents(GridLongList cntrs) { @@ -322,36 +334,42 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 4: - if (!writer.writeMessage("key", key)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 5: - if (!writer.writeMessage("newVal", newVal)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); case 6: - if (!writer.writeMessage("oldVal", oldVal)) + if (!writer.writeMessage("newVal", newVal)) return false; writer.incrementState(); case 7: - if (!writer.writeInt("part", part)) + if (!writer.writeMessage("oldVal", oldVal)) return false; writer.incrementState(); case 8: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 10: if (!writer.writeLong("updateCntr", updateCntr)) return false; @@ -407,7 +425,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: - key = reader.readMessage("key"); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -415,7 +433,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 5: - newVal = reader.readMessage("newVal"); + key = reader.readMessage("key"); if (!reader.isLastRead()) return false; @@ -423,7 +441,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 6: - oldVal = reader.readMessage("oldVal"); + newVal = reader.readMessage("newVal"); if (!reader.isLastRead()) return false; @@ -431,7 +449,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 7: - part = reader.readInt("part"); + oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) return false; @@ -439,7 +457,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 8: - topVer = reader.readMessage("topVer"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -447,6 +465,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -461,11 +487,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheContinuousQueryEntry.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index d26be5f..f665339 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -59,18 +59,18 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { /** {@inheritDoc} */ @Override public K getKey() { - return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), true, false); + return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), e.isKeepBinary(), false); } /** {@inheritDoc} */ @Override public V getValue() { - return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), true, false); + return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), e.isKeepBinary(), false); } /** {@inheritDoc} */ @Override public V getOldValue() { - return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), true, false); + return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), e.isKeepBinary(), false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index b69d4cd..aa9bea3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -81,7 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; /** * Continuous query handler. */ -class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { +public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; @@ -128,7 +128,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient Collection<CacheContinuousQueryEntry> backupQueue; /** */ - private boolean localCache; + private boolean locCache; + + /** */ + private transient boolean keepBinary; /** */ private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; @@ -180,7 +183,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { boolean ignoreExpired, int taskHash, boolean skipPrimaryCheck, - boolean locCache) { + boolean locCache, + boolean keepBinary) { assert topic != null; assert locLsnr != null; @@ -195,7 +199,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; - this.localCache = locCache; + this.locCache = locCache; + this.keepBinary = keepBinary; cacheId = CU.cacheId(cacheName); } @@ -216,6 +221,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; + } + + /** + * @param keepBinary Keep binary flag. + */ + public void keepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ @Override public String cacheName() { return cacheName; } @@ -284,6 +301,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } + /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; + } + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) @@ -317,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (primary || skipPrimaryCheck) { if (loc) { - if (!localCache) { + if (!locCache) { Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry); if (!entries.isEmpty()) { @@ -848,7 +870,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** * @param e Entry. - * @param topVer Topology version. * @return Continuous query entry. */ private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 8342acf..86abbef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -81,6 +81,11 @@ interface CacheContinuousQueryListener<K, V> { public boolean oldValueRequired(); /** + * @return Keep binary flag. + */ + public boolean keepBinary(); + + /** * @return Whether to notify on existing entries. */ public boolean notifyExisting(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index b2e7490..0e4cb40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -135,7 +135,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (cfgs != null) { for (CacheEntryListenerConfiguration cfg : cfgs) - executeJCacheQuery(cfg, true); + executeJCacheQuery(cfg, true, false); } } @@ -161,21 +161,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { */ public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) { if (lsnrCnt.get() > 0) { - CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( - cctx.cacheId(), - UPDATED, - key, - null, - null, - partId, - updCntr, - topVer); + for (CacheContinuousQueryListener lsnr : lsnrs.values()) { + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + cctx.cacheId(), + UPDATED, + key, + null, + null, + lsnr.keepBinary(), + partId, + updCntr, + topVer); - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - for (CacheContinuousQueryListener lsnr : lsnrs.values()) lsnr.skipUpdateEvent(evt, topVer); + } } } @@ -253,6 +255,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { key, newVal, lsnr.oldValueRequired() ? oldVal : null, + lsnr.keepBinary(), partId, updateCntr, topVer); @@ -306,6 +309,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { key, null, lsnr.oldValueRequired() ? oldVal : null, + lsnr.keepBinary(), e.partition(), -1, null); @@ -333,7 +337,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { int bufSize, long timeInterval, boolean autoUnsubscribe, - boolean loc) throws IgniteCheckedException + boolean loc, + boolean keepBinary) throws IgniteCheckedException { return executeQuery0( locLsnr, @@ -346,7 +351,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { true, false, true, - loc); + loc, + keepBinary); } /** @@ -374,7 +380,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { true, false, true, - loc); + loc, + false); } /** @@ -395,9 +402,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param onStart Whether listener is created on node start. * @throws IgniteCheckedException If failed. */ - public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart) + public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary) throws IgniteCheckedException { - JCacheQuery lsnr = new JCacheQuery(cfg, onStart); + JCacheQuery lsnr = new JCacheQuery(cfg, onStart, keepBinary); JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr); @@ -471,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean oldValRequired, boolean sync, boolean ignoreExpired, - boolean loc) throws IgniteCheckedException + boolean loc, + final boolean keepBinary) throws IgniteCheckedException { cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -492,7 +500,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { ignoreExpired, taskNameHash, skipPrimaryCheck, - cctx.isLocal()); + cctx.isLocal(), + keepBinary); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); @@ -550,6 +559,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { e.key(), e.rawGet(), null, + keepBinary, 0, -1, null); @@ -633,15 +643,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { private final boolean onStart; /** */ + private final boolean keepBinary; + + /** */ private volatile UUID routineId; /** * @param cfg Listener configuration. * @param onStart {@code True} if executed on cache start. */ - private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart) { + private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary) { this.cfg = cfg; this.onStart = onStart; + this.keepBinary = keepBinary; } /** @@ -694,7 +708,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cfg.isOldValueRequired(), cfg.isSynchronous(), false, - false); + false, + keepBinary); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index d8698b3..68b83ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -143,6 +143,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public boolean isForQuery(); /** + * @return {@code True} if Ignite Binary objects should be passed to the listener and filter. + */ + public boolean keepBinary(); + + /** * @return Cache name if this is a continuous query handler. */ public String cacheName(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index e218790..f473d02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -612,7 +613,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) hnd.onListenerRegistered(routineId, ctx); - ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData)); + ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, + reqData.handler().keepBinary())); } catch (IgniteCheckedException e) { startFuts.remove(routineId); @@ -822,6 +824,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridContinuousHandler hnd = data.handler(); + if (req.keepBinary()) { + assert hnd instanceof CacheContinuousQueryHandler; + + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } + IgniteCheckedException err = null; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 82c0377..ff037d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -40,14 +40,18 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private Map<Integer, Long> updateCntrs; + /** Keep binary flag. */ + private boolean keepBinary; + /** * @param routineId Routine id. * @param startReqData Start request data. */ - public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { + public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, boolean keepBinary) { super(routineId); this.startReqData = startReqData; + this.keepBinary = keepBinary; } /** @@ -88,6 +92,13 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { return errs; } + /** + * @return {@code True} if keep binary flag was set on continuous handler. + */ + public boolean keepBinary() { + return keepBinary; + } + /** {@inheritDoc} */ @Override public boolean isMutable() { return true;
