For GridNearAtomicUpdateResponse moved fields related to near cache to separate class.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81c1964b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81c1964b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81c1964b Branch: refs/heads/ignite-3477-master Commit: 81c1964b8a2e8af56747e173003fed97c58fcbfa Parents: 837bae6 Author: sboikov <[email protected]> Authored: Mon Mar 20 12:46:32 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 20 12:46:32 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 + .../atomic/GridNearAtomicUpdateResponse.java | 192 ++--------- .../dht/atomic/NearCacheUpdates.java | 332 +++++++++++++++++++ 3 files changed, 374 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81c1964b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 0548581..07e8941 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; @@ -176,6 +177,11 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -48: + msg = new NearCacheUpdates(); + + break; + case -47: msg = new GridNearAtomicCheckUpdateRequest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81c1964b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 4e20fc7..8b52ba8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; @@ -36,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -71,27 +69,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** */ private AffinityTopologyVersion remapTopVer; - /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */ - @GridDirectCollection(int.class) - private List<Integer> nearValsIdxs; - - /** Indexes of keys for which update was skipped (used if originating node has near cache). */ - @GridDirectCollection(int.class) - private List<Integer> nearSkipIdxs; - - /** Values generated on primary node which should be put to originating node's near cache. */ - @GridToStringInclude - @GridDirectCollection(CacheObject.class) - private List<CacheObject> nearVals; - - /** Version generated on primary node to be used for originating node's near cache update. */ - private GridCacheVersion nearVer; - - /** Near TTLs. */ - private GridLongList nearTtls; - - /** Near expire times. */ - private GridLongList nearExpireTimes; + /** Data for near cache update. */ + private NearCacheUpdates nearUpdates; /** Partition ID. */ private int partId = -1; @@ -235,6 +214,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** + * + */ + private void initNearUpdates() { + if (nearUpdates == null) + nearUpdates = new NearCacheUpdates(); + } + + /** * Adds value to be put in near cache on originating node. * * @param keyIdx Key index. @@ -246,15 +233,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @Nullable CacheObject val, long ttl, long expireTime) { - if (nearValsIdxs == null) { - nearValsIdxs = new ArrayList<>(); - nearVals = new ArrayList<>(); - } - - addNearTtl(keyIdx, ttl, expireTime); + initNearUpdates(); - nearValsIdxs.add(keyIdx); - nearVals.add(val); + nearUpdates.addNearValue(keyIdx, val, ttl, expireTime); } /** @@ -264,29 +245,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr */ @SuppressWarnings("ForLoopReplaceableByForEach") void addNearTtl(int keyIdx, long ttl, long expireTime) { - if (ttl >= 0) { - if (nearTtls == null) { - nearTtls = new GridLongList(16); - - for (int i = 0; i < keyIdx; i++) - nearTtls.add(-1L); - } - } - - if (nearTtls != null) - nearTtls.add(ttl); - - if (expireTime >= 0) { - if (nearExpireTimes == null) { - nearExpireTimes = new GridLongList(16); + initNearUpdates(); - for (int i = 0; i < keyIdx; i++) - nearExpireTimes.add(-1); - } - } - - if (nearExpireTimes != null) - nearExpireTimes.add(expireTime); + nearUpdates.addNearTtl(keyIdx, ttl, expireTime); } /** @@ -294,13 +255,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @return Expire time for near cache update. */ public long nearExpireTime(int idx) { - if (nearExpireTimes != null) { - assert idx >= 0 && idx < nearExpireTimes.size(); - - return nearExpireTimes.get(idx); - } - - return -1L; + return nearUpdates != null ? nearUpdates.nearExpireTime(idx) : -1L; } /** @@ -308,53 +263,46 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @return TTL for near cache update. */ public long nearTtl(int idx) { - if (nearTtls != null) { - assert idx >= 0 && idx < nearTtls.size(); - - return nearTtls.get(idx); - } - - return -1L; + return nearUpdates != null ? nearUpdates.nearTtl(idx) : -1L; } /** * @param nearVer Version generated on primary node to be used for originating node's near cache update. */ void nearVersion(GridCacheVersion nearVer) { - this.nearVer = nearVer; + initNearUpdates(); + + nearUpdates.nearVersion(nearVer); } /** * @return Version generated on primary node to be used for originating node's near cache update. */ public GridCacheVersion nearVersion() { - return nearVer; + return nearUpdates != null ? nearUpdates.nearVersion() : null; } /** * @param keyIdx Index of key for which update was skipped */ void addSkippedIndex(int keyIdx) { - if (nearSkipIdxs == null) - nearSkipIdxs = new ArrayList<>(); - - nearSkipIdxs.add(keyIdx); + initNearUpdates(); - addNearTtl(keyIdx, -1L, -1L); + nearUpdates.addSkippedIndex(keyIdx); } /** * @return Indexes of keys for which update was skipped */ @Nullable public List<Integer> skippedIndexes() { - return nearSkipIdxs; + return nearUpdates != null ? nearUpdates.skippedIndexes() : null; } /** * @return Indexes of keys for which values were generated on primary node. */ @Nullable public List<Integer> nearValuesIndexes() { - return nearValsIdxs; + return nearUpdates != null ? nearUpdates.nearValuesIndexes() : null; } /** @@ -362,7 +310,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @return Value generated on primary node which should be put to originating node's near cache. */ @Nullable public CacheObject nearValue(int idx) { - return nearVals.get(idx); + return nearUpdates != null ? nearUpdates.nearValue(idx) : null; } /** @@ -401,7 +349,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr if (errs != null) errs.prepareMarshal(this, cctx); - prepareMarshalCacheObjects(nearVals, cctx); + if (nearUpdates != null) + prepareMarshalCacheObjects(nearUpdates.nearValues(), cctx); if (ret != null) ret.prepareMarshal(cctx); @@ -416,7 +365,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr if (errs != null) errs.finishUnmarshal(this, cctx, ldr); - finishUnmarshalCacheObjects(nearVals, cctx, ldr); + if (nearUpdates != null) + finishUnmarshalCacheObjects(nearUpdates.nearValues(), cctx, ldr); if (ret != null) ret.finishUnmarshal(cctx, ldr); @@ -471,54 +421,24 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 6: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + if (!writer.writeMessage("nearUpdates", nearUpdates)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeMessage("nearTtls", nearTtls)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeMessage("nearVer", nearVer)) - return false; - - writer.incrementState(); - - case 12: if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); - case 13: + case 8: if (!writer.writeMessage("remapTopVer", remapTopVer)) return false; writer.incrementState(); - case 14: + case 9: if (!writer.writeMessage("ret", ret)) return false; @@ -565,7 +485,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 6: - nearExpireTimes = reader.readMessage("nearExpireTimes"); + nearUpdates = reader.readMessage("nearUpdates"); if (!reader.isLastRead()) return false; @@ -573,46 +493,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 7: - nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - nearTtls = reader.readMessage("nearTtls"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - nearVer = reader.readMessage("nearVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: partId = reader.readInt("partId"); if (!reader.isLastRead()) @@ -620,7 +500,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 13: + case 8: remapTopVer = reader.readMessage("remapTopVer"); if (!reader.isLastRead()) @@ -628,7 +508,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 14: + case 9: ret = reader.readMessage("ret"); if (!reader.isLastRead()) @@ -648,7 +528,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/81c1964b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java new file mode 100644 index 0000000..f4ecedf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class NearCacheUpdates implements Message { + /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */ + @GridDirectCollection(int.class) + private List<Integer> nearValsIdxs; + + /** Indexes of keys for which update was skipped (used if originating node has near cache). */ + @GridDirectCollection(int.class) + private List<Integer> nearSkipIdxs; + + /** Values generated on primary node which should be put to originating node's near cache. */ + @GridToStringInclude + @GridDirectCollection(CacheObject.class) + private List<CacheObject> nearVals; + + /** Version generated on primary node to be used for originating node's near cache update. */ + private GridCacheVersion nearVer; + + /** Near TTLs. */ + private GridLongList nearTtls; + + /** Near expire times. */ + private GridLongList nearExpireTimes; + + /** + * @return Values. + */ + List<CacheObject> nearValues() { + return nearVals; + } + + /** + * Adds value to be put in near cache on originating node. + * + * @param keyIdx Key index. + * @param val Value. + * @param ttl TTL for near cache update. + * @param expireTime Expire time for near cache update. + */ + void addNearValue(int keyIdx, + @Nullable CacheObject val, + long ttl, + long expireTime) { + if (nearValsIdxs == null) { + nearValsIdxs = new ArrayList<>(); + nearVals = new ArrayList<>(); + } + + addNearTtl(keyIdx, ttl, expireTime); + + nearValsIdxs.add(keyIdx); + nearVals.add(val); + } + + /** + * @param keyIdx Key index. + * @param ttl TTL for near cache update. + * @param expireTime Expire time for near cache update. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + void addNearTtl(int keyIdx, long ttl, long expireTime) { + if (ttl >= 0) { + if (nearTtls == null) { + nearTtls = new GridLongList(16); + + for (int i = 0; i < keyIdx; i++) + nearTtls.add(-1L); + } + } + + if (nearTtls != null) + nearTtls.add(ttl); + + if (expireTime >= 0) { + if (nearExpireTimes == null) { + nearExpireTimes = new GridLongList(16); + + for (int i = 0; i < keyIdx; i++) + nearExpireTimes.add(-1); + } + } + + if (nearExpireTimes != null) + nearExpireTimes.add(expireTime); + } + + /** + * @param idx Index. + * @return Expire time for near cache update. + */ + long nearExpireTime(int idx) { + if (nearExpireTimes != null) { + assert idx >= 0 && idx < nearExpireTimes.size(); + + return nearExpireTimes.get(idx); + } + + return -1L; + } + + /** + * @param idx Index. + * @return TTL for near cache update. + */ + long nearTtl(int idx) { + if (nearTtls != null) { + assert idx >= 0 && idx < nearTtls.size(); + + return nearTtls.get(idx); + } + + return -1L; + } + + /** + * @param nearVer Version generated on primary node to be used for originating node's near cache update. + */ + void nearVersion(GridCacheVersion nearVer) { + this.nearVer = nearVer; + } + + /** + * @return Version generated on primary node to be used for originating node's near cache update. + */ + GridCacheVersion nearVersion() { + return nearVer; + } + + /** + * @param keyIdx Index of key for which update was skipped + */ + void addSkippedIndex(int keyIdx) { + if (nearSkipIdxs == null) + nearSkipIdxs = new ArrayList<>(); + + nearSkipIdxs.add(keyIdx); + + addNearTtl(keyIdx, -1L, -1L); + } + + /** + * @return Indexes of keys for which update was skipped + */ + @Nullable List<Integer> skippedIndexes() { + return nearSkipIdxs; + } + + /** + * @return Indexes of keys for which values were generated on primary node. + */ + @Nullable List<Integer> nearValuesIndexes() { + return nearValsIdxs; + } + + /** + * @param idx Index. + * @return Value generated on primary node which should be put to originating node's near cache. + */ + @Nullable CacheObject nearValue(int idx) { + return nearVals.get(idx); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("nearTtls", nearTtls)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("nearVer", nearVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + nearExpireTimes = reader.readMessage("nearExpireTimes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + nearTtls = reader.readMessage("nearTtls"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + nearVer = reader.readMessage("nearVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(NearCacheUpdates.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -48; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 6; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NearCacheUpdates.class, this); + } +}
