Repository: ignite Updated Branches: refs/heads/ignite-5075-cc ff0a2dd8a -> ab5aead4d
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab5aead4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab5aead4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab5aead4 Branch: refs/heads/ignite-5075-cc Commit: ab5aead4dcb651001c362326e6a0b50350b31c2e Parents: ff0a2dd Author: sboikov <[email protected]> Authored: Thu May 25 17:33:46 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 17:33:46 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEntry.java | 59 ++++++++++---------- .../CacheContinuousQueryEventBuffer.java | 7 +++ 2 files changed, 35 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 9db92b2..28fdee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final byte FILTERED_ENTRY = 0b0010; /** */ + private static final byte KEEP_BINARY = 0b0100; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -105,9 +108,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridToStringInclude private AffinityTopologyVersion topVer; - /** Keep binary. */ - private boolean keepBinary; - /** */ private long filteredCnt; @@ -124,6 +124,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param keepBinary Keep binary flag. * @param part Partition. * @param updateCntr Update partition counter. * @param topVer Topology version if applicable. @@ -146,7 +147,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { this.part = part; this.updateCntr = updateCntr; this.topVer = topVer; - this.keepBinary = keepBinary; + + if (keepBinary) + flags |= KEEP_BINARY; } /** @@ -231,7 +234,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { return this; CacheContinuousQueryEntry e = new CacheContinuousQueryEntry( - cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer); + cacheId, + null, + null, + null, + null, + false, + part, + updateCntr, + topVer); e.flags = flags; @@ -256,7 +267,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @return Keep binary flag. */ boolean isKeepBinary() { - return keepBinary; + return (flags & KEEP_BINARY) != 0; } /** @@ -370,42 +381,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 4: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 5: if (!writer.writeMessage("key", isFiltered() ? null : key)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeMessage("newVal", isFiltered() ? null : newVal)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeInt("part", part)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeLong("updateCntr", updateCntr)) return false; @@ -457,14 +462,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -472,7 +469,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 6: + case 5: newVal = reader.readMessage("newVal"); if (!reader.isLastRead()) @@ -480,7 +477,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 7: + case 6: oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) @@ -488,7 +485,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 8: + case 7: part = reader.readInt("part"); if (!reader.isLastRead()) @@ -496,7 +493,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 9: + case 8: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -504,7 +501,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 10: + case 9: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -519,7 +516,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index f496c8c..fd4029c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -96,6 +96,13 @@ public class CacheContinuousQueryEventBuffer { else ret = entries; + if (!pending.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); + + ret.addAll(pending.values()); + } + return ret; }
