http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index 0000000,0ea9769..73fcb03 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@@ -1,0 -1,339 +1,333 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Force keys response. Contains absent keys. + */ + public class GridDhtForceKeysResponse<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Mini-future ID. */ + private IgniteUuid miniId; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> missedKeyBytes; + + /** Missed (not found) keys. */ + @GridToStringInclude + @GridDirectTransient + private Collection<K> missedKeys; + + /** Cache entries. */ + @GridToStringInclude + @GridDirectTransient + private List<GridCacheEntryInfo<K, V>> infos; + + /** */ + private byte[] infosBytes; + + /** + * Required by {@link Externalizable}. + */ + public GridDhtForceKeysResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param futId Request id. + * @param miniId Mini-future ID. + */ + public GridDhtForceKeysResponse(int cacheId, IgniteUuid futId, IgniteUuid miniId) { + assert futId != null; + assert miniId != null; + + this.cacheId = cacheId; + this.futId = futId; + this.miniId = miniId; + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @return Keys. + */ + public Collection<K> missedKeys() { + return missedKeys == null ? Collections.<K>emptyList() : missedKeys; + } + + /** + * @return Forced entries. + */ + public Collection<GridCacheEntryInfo<K, V>> forcedInfos() { + return infos == null ? Collections.<GridCacheEntryInfo<K,V>>emptyList() : infos; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini-future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @param key Key. + */ + public void addMissed(K key) { + if (missedKeys == null) + missedKeys = new ArrayList<>(); + + missedKeys.add(key); + } + + /** + * @param info Entry info to add. + */ + public void addInfo(GridCacheEntryInfo<K, V> info) { + assert info != null; + + if (infos == null) + infos = new ArrayList<>(); + + infos.add(info); + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (missedKeys != null && missedKeyBytes == null) + missedKeyBytes = marshalCollection(missedKeys, ctx); + + if (infos != null) { + marshalInfos(infos, ctx); + + infosBytes = ctx.marshaller().marshal(infos); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (missedKeys == null && missedKeyBytes != null) + missedKeys = unmarshalCollection(missedKeyBytes, ctx, ldr); + + if (infosBytes != null) { + infos = ctx.marshaller().unmarshal(infosBytes, ldr); + + unmarshalInfos(infos, ctx.cacheContext(cacheId()), ldr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtForceKeysResponse _clone = new GridDhtForceKeysResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtForceKeysResponse _clone = (GridDhtForceKeysResponse)_msg; + + _clone.futId = futId; + _clone.miniId = miniId; + _clone.missedKeyBytes = missedKeyBytes; + _clone.missedKeys = missedKeys; + _clone.infos = infos; + _clone.infosBytes = infosBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: - if (!commState.putGridUuid(futId)) ++ if (!commState.putGridUuid("futId", futId)) + return false; + + commState.idx++; + + case 4: - if (!commState.putByteArray(infosBytes)) ++ if (!commState.putByteArray("infosBytes", infosBytes)) + return false; + + commState.idx++; + + case 5: - if (!commState.putGridUuid(miniId)) ++ if (!commState.putGridUuid("miniId", miniId)) + return false; + + commState.idx++; + + case 6: + if (missedKeyBytes != null) { + if (commState.it == null) { - if (!commState.putInt(missedKeyBytes.size())) ++ if (!commState.putInt(null, missedKeyBytes.size())) + return false; + + commState.it = missedKeyBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putByteArray((byte[])commState.cur)) ++ if (!commState.putByteArray(null, (byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: - IgniteUuid futId0 = commState.getGridUuid(); ++ futId = commState.getGridUuid("futId"); + - if (futId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - futId = futId0; - + commState.idx++; + + case 4: - byte[] infosBytes0 = commState.getByteArray(); ++ infosBytes = commState.getByteArray("infosBytes"); + - if (infosBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - infosBytes = infosBytes0; - + commState.idx++; + + case 5: - IgniteUuid miniId0 = commState.getGridUuid(); ++ miniId = commState.getGridUuid("miniId"); + - if (miniId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - miniId = miniId0; - + commState.idx++; + + case 6: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (missedKeyBytes == null) + missedKeyBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); ++ byte[] _val = commState.getByteArray(null); + - if (_val == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + + missedKeyBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 42; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtForceKeysResponse.class, this, super.toString()); + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 0000000,29568c9..bf41e14 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@@ -1,0 -1,390 +1,388 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Partition demand request. + */ + public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Update sequence. */ + private long updateSeq; + + /** Partition. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Set<Integer> parts; + + /** Topic. */ + @GridDirectTransient + private Object topic; + + /** Serialized topic. */ + private byte[] topicBytes; + + /** Timeout. */ + private long timeout; + + /** Worker ID. */ + private int workerId = -1; + + /** Topology version. */ + private long topVer; + + /** + * @param updateSeq Update sequence for this node. + * @param topVer Topology version. + */ + GridDhtPartitionDemandMessage(long updateSeq, long topVer, int cacheId) { + assert updateSeq > 0; + + this.cacheId = cacheId; + this.updateSeq = updateSeq; + this.topVer = topVer; + } + + /** + * @param cp Message to copy from. + */ + GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage<K, V> cp, Collection<Integer> parts) { + cacheId = cp.cacheId; + updateSeq = cp.updateSeq; + topic = cp.topic; + timeout = cp.timeout; + workerId = cp.workerId; + topVer = cp.topVer; + + // Create a copy of passed in collection since it can be modified when this message is being sent. + this.parts = new HashSet<>(parts); + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionDemandMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @param p Partition. + */ + void addPartition(int p) { + if (parts == null) + parts = new HashSet<>(); + + parts.add(p); + } + + + /** + * @return Partition. + */ + Set<Integer> partitions() { + return parts; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * @return Reply message timeout. + */ + long timeout() { + return timeout; + } + + /** + * @param timeout Timeout. + */ + void timeout(long timeout) { + this.timeout = timeout; + } + + /** + * @return Topic. + */ + Object topic() { + return topic; + } + + /** + * @param topic Topic. + */ + void topic(Object topic) { + this.topic = topic; + } + + /** + * @return Worker ID. + */ + int workerId() { + return workerId; + } + + /** + * @param workerId Worker ID. + */ + void workerId(int workerId) { + this.workerId = workerId; + } + + /** + * @return Topology version for which demand message is sent. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (topic != null) + topicBytes = ctx.marshaller().marshal(topic); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (topicBytes != null) + topic = ctx.marshaller().unmarshal(topicBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtPartitionDemandMessage _clone = new GridDhtPartitionDemandMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtPartitionDemandMessage _clone = (GridDhtPartitionDemandMessage)_msg; + + _clone.updateSeq = updateSeq; + _clone.parts = parts; + _clone.topic = topic; + _clone.topicBytes = topicBytes; + _clone.timeout = timeout; + _clone.workerId = workerId; + _clone.topVer = topVer; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (parts != null) { + if (commState.it == null) { - if (!commState.putInt(parts.size())) ++ if (!commState.putInt(null, parts.size())) + return false; + + commState.it = parts.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putInt((int)commState.cur)) ++ if (!commState.putInt(null, (int)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 4: - if (!commState.putLong(timeout)) ++ if (!commState.putLong("timeout", timeout)) + return false; + + commState.idx++; + + case 5: - if (!commState.putLong(topVer)) ++ if (!commState.putLong("topVer", topVer)) + return false; + + commState.idx++; + + case 6: - if (!commState.putByteArray(topicBytes)) ++ if (!commState.putByteArray("topicBytes", topicBytes)) + return false; + + commState.idx++; + + case 7: - if (!commState.putLong(updateSeq)) ++ if (!commState.putLong("updateSeq", updateSeq)) + return false; + + commState.idx++; + + case 8: - if (!commState.putInt(workerId)) ++ if (!commState.putInt("workerId", workerId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (parts == null) + parts = new HashSet<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - if (buf.remaining() < 4) - return false; ++ int _val = commState.getInt(null); + - int _val = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + parts.add((Integer)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 4: - if (buf.remaining() < 8) - return false; ++ timeout = commState.getLong("timeout"); + - timeout = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 5: - if (buf.remaining() < 8) - return false; ++ topVer = commState.getLong("topVer"); + - topVer = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 6: - byte[] topicBytes0 = commState.getByteArray(); ++ topicBytes = commState.getByteArray("topicBytes"); + - if (topicBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - topicBytes = topicBytes0; - + commState.idx++; + + case 7: - if (buf.remaining() < 8) - return false; ++ updateSeq = commState.getLong("updateSeq"); + - updateSeq = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 8: - if (buf.remaining() < 4) - return false; ++ workerId = commState.getInt("workerId"); + - workerId = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 43; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super", + super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 0000000,b1c6f45..417ab84 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@@ -1,0 -1,519 +1,517 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Partition supply message. + */ + public class GridDhtPartitionSupplyMessage<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Worker ID. */ + private int workerId = -1; + + /** Update sequence. */ + private long updateSeq; + + /** Acknowledgement flag. */ + private boolean ack; + + /** Partitions that have been fully sent. */ + @GridDirectCollection(int.class) + private Set<Integer> last; + + /** Partitions which were not found. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Set<Integer> missed; + + /** Entries. */ + @GridDirectTransient + private Map<Integer, Collection<GridCacheEntryInfo<K, V>>> infos = new HashMap<>(); + + /** Cache entries in serialized form. */ + @GridToStringExclude + @GridDirectTransient + private Map<Integer, Collection<byte[]>> infoBytesMap = new HashMap<>(); + + /** */ + private byte[] infoBytes; + + /** Message size. */ + @GridDirectTransient + private int msgSize; + + /** + * @param workerId Worker ID. + * @param updateSeq Update sequence for this node. + */ + GridDhtPartitionSupplyMessage(int workerId, long updateSeq, int cacheId) { + assert workerId >= 0; + assert updateSeq > 0; + + this.cacheId = cacheId; + this.updateSeq = updateSeq; + this.workerId = workerId; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionSupplyMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean ignoreClassErrors() { + return true; + } + + /** + * @return Worker ID. + */ + int workerId() { + return workerId; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * Marks this message for acknowledgment. + */ + void markAck() { + ack = true; + } + + /** + * @return Acknowledgement flag. + */ + boolean ack() { + return ack; + } + + /** + * @return Flag to indicate last message for partition. + */ + Set<Integer> last() { + return last == null ? Collections.<Integer>emptySet() : last; + } + + /** + * @param p Partition which was fully sent. + */ + void last(int p) { + if (last == null) + last = new HashSet<>(); + + if (last.add(p)) { + msgSize += 4; + + // If partition is empty, we need to add it. + Collection<byte[]> serInfo = infoBytesMap.get(p); + + if (serInfo == null) + infoBytesMap.put(p, new LinkedList<byte[]>()); + } + } + + /** + * @param p Missed partition. + */ + void missed(int p) { + if (missed == null) + missed = new HashSet<>(); + + if (missed.add(p)) + msgSize += 4; + } + + /** + * @return Missed partitions. + */ + Set<Integer> missed() { + return missed == null ? Collections.<Integer>emptySet() : missed; + } + + /** + * @return Entries. + */ + Map<Integer, Collection<GridCacheEntryInfo<K, V>>> infos() { + return infos; + } + + /** + * @return Message size. + */ + int messageSize() { + return msgSize; + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry(int p, GridCacheEntryInfo<K, V> info, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + assert info != null; + + marshalInfo(info, ctx); + + byte[] bytes = CU.marshal(ctx, info); + + msgSize += bytes.length; + + Collection<byte[]> serInfo = infoBytesMap.get(p); + + if (serInfo == null) { + msgSize += 4; + + infoBytesMap.put(p, serInfo = new LinkedList<>()); + } + + serInfo.add(bytes); + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry0(int p, GridCacheEntryInfo<K, V> info, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + assert info != null; + assert info.keyBytes() != null; + assert info.valueBytes() != null || info.value() instanceof byte[] : + "Missing value bytes with invalid value: " + info.value(); + + // Need to call this method to initialize info properly. + marshalInfo(info, ctx); + + byte[] bytes = CU.marshal(ctx, info); + + msgSize += bytes.length; + + Collection<byte[]> serInfo = infoBytesMap.get(p); + + if (serInfo == null) { + msgSize += 4; + + infoBytesMap.put(p, serInfo = new LinkedList<>()); + } + + serInfo.add(bytes); + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + infoBytes = ctx.marshaller().marshal(infoBytesMap); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + infoBytesMap = ctx.marshaller().unmarshal(infoBytes, ldr); + + GridCacheContext<K, V> cacheCtx = ctx.cacheContext(cacheId); + + for (Map.Entry<Integer, Collection<byte[]>> e : infoBytesMap.entrySet()) { + Collection<GridCacheEntryInfo<K, V>> entries = unmarshalCollection(e.getValue(), ctx, ldr); + + unmarshalInfos(entries, cacheCtx, ldr); + + infos.put(e.getKey(), entries); + } + } + + /** + * @return Number of entries in message. + */ + public int size() { + return infos.isEmpty() ? infoBytesMap.size() : infos.size(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtPartitionSupplyMessage _clone = new GridDhtPartitionSupplyMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtPartitionSupplyMessage _clone = (GridDhtPartitionSupplyMessage)_msg; + + _clone.workerId = workerId; + _clone.updateSeq = updateSeq; + _clone.ack = ack; + _clone.last = last; + _clone.missed = missed; + _clone.infos = infos; + _clone.infoBytesMap = infoBytesMap; + _clone.infoBytes = infoBytes; + _clone.msgSize = msgSize; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: - if (!commState.putBoolean(ack)) ++ if (!commState.putBoolean("ack", ack)) + return false; + + commState.idx++; + + case 4: - if (!commState.putByteArray(infoBytes)) ++ if (!commState.putByteArray("infoBytes", infoBytes)) + return false; + + commState.idx++; + + case 5: + if (last != null) { + if (commState.it == null) { - if (!commState.putInt(last.size())) ++ if (!commState.putInt(null, last.size())) + return false; + + commState.it = last.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putInt((int)commState.cur)) ++ if (!commState.putInt(null, (int)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 6: + if (missed != null) { + if (commState.it == null) { - if (!commState.putInt(missed.size())) ++ if (!commState.putInt(null, missed.size())) + return false; + + commState.it = missed.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putInt((int)commState.cur)) ++ if (!commState.putInt(null, (int)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 7: - if (!commState.putLong(updateSeq)) ++ if (!commState.putLong("updateSeq", updateSeq)) + return false; + + commState.idx++; + + case 8: - if (!commState.putInt(workerId)) ++ if (!commState.putInt("workerId", workerId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: - if (buf.remaining() < 1) - return false; ++ ack = commState.getBoolean("ack"); + - ack = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 4: - byte[] infoBytes0 = commState.getByteArray(); ++ infoBytes = commState.getByteArray("infoBytes"); + - if (infoBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - infoBytes = infoBytes0; - + commState.idx++; + + case 5: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (last == null) + last = new HashSet<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - if (buf.remaining() < 4) - return false; ++ int _val = commState.getInt(null); + - int _val = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + last.add((Integer)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 6: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (missed == null) + missed = new HashSet<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - if (buf.remaining() < 4) - return false; ++ int _val = commState.getInt(null); + - int _val = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + missed.add((Integer)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 7: - if (buf.remaining() < 8) - return false; ++ updateSeq = commState.getLong("updateSeq"); + - updateSeq = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 8: - if (buf.remaining() < 4) - return false; ++ workerId = commState.getInt("workerId"); + - workerId = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 44; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionSupplyMessage.class, this, + "size", size(), + "parts", infos.keySet(), + "super", super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 0000000,1f55c59..1eb069e mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@@ -1,0 -1,158 +1,154 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + + /** + * Request for single partition info. + */ + abstract class GridDhtPartitionsAbstractMessage<K, V> extends GridCacheMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Exchange ID. */ + private GridDhtPartitionExchangeId exchId; + + /** Last used cache version. */ + private GridCacheVersion lastVer; + + /** + * Required by {@link Externalizable}. + */ + protected GridDhtPartitionsAbstractMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @param exchId Exchange ID. + * @param lastVer Last version. + */ + GridDhtPartitionsAbstractMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) { + this.exchId = exchId; + this.lastVer = lastVer; + } + + /** + * @return Exchange ID. + */ + public GridDhtPartitionExchangeId exchangeId() { + return exchId; + } + + /** + * @return Last used version among all nodes. + */ + @Nullable public GridCacheVersion lastVersion() { + return lastVer; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtPartitionsAbstractMessage _clone = (GridDhtPartitionsAbstractMessage)_msg; + + _clone.exchId = exchId; + _clone.lastVer = lastVer; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: - if (!commState.putDhtPartitionExchangeId(exchId)) ++ if (!commState.putDhtPartitionExchangeId("exchId", exchId)) + return false; + + commState.idx++; + + case 4: - if (!commState.putCacheVersion(lastVer)) ++ if (!commState.putCacheVersion("lastVer", lastVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: - GridDhtPartitionExchangeId exchId0 = commState.getDhtPartitionExchangeId(); ++ exchId = commState.getDhtPartitionExchangeId("exchId"); + - if (exchId0 == DHT_PART_EXCHANGE_ID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - exchId = exchId0; - + commState.idx++; + + case 4: - GridCacheVersion lastVer0 = commState.getCacheVersion(); ++ lastVer = commState.getCacheVersion("lastVer"); + - if (lastVer0 == CACHE_VER_NOT_READ) ++ if (!commState.lastRead()) + return false; + - lastVer = lastVer0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionsAbstractMessage.class, this, super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 0000000,3f8cefb..ec138fd mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@@ -1,0 -1,259 +1,255 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Information about partitions of all nodes in topology. + */ + public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstractMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>(); + + /** */ + private byte[] partsBytes; + + /** Topology version. */ + private long topVer; + + @GridDirectTransient + private List<List<ClusterNode>> affAssignment; + + /** */ + private byte[] affAssignmentBytes; + + /** + * Required by {@link Externalizable}. + */ + public GridDhtPartitionsFullMessage() { + // No-op. + } + + /** + * @param id Exchange ID. + * @param lastVer Last version. + */ + public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, + long topVer) { + super(id, lastVer); + + assert parts != null; + assert id == null || topVer == id.topologyVersion(); + + this.topVer = topVer; + } + + /** + * @return Local partitions. + */ + public Map<Integer, GridDhtPartitionFullMap> partitions() { + return parts; + } + + /** + * @param cacheId Cache ID. + * @param fullMap Full partitions map. + */ + public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + parts.put(cacheId, fullMap); + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (parts != null) + partsBytes = ctx.marshaller().marshal(parts); + + if (affAssignment != null) + affAssignmentBytes = ctx.marshaller().marshal(affAssignment); + } + + /** + * @return Topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** + * @param topVer Topology version. + */ + public void topologyVersion(long topVer) { + this.topVer = topVer; + } + + /** + * @return Affinity assignment for topology version. + */ + public List<List<ClusterNode>> affinityAssignment() { + return affAssignment; + } + + /** + * @param affAssignment Affinity assignment for topology version. + */ + public void affinityAssignment(List<List<ClusterNode>> affAssignment) { + this.affAssignment = affAssignment; + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (partsBytes != null) + parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (affAssignmentBytes != null) + affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtPartitionsFullMessage _clone = new GridDhtPartitionsFullMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtPartitionsFullMessage _clone = (GridDhtPartitionsFullMessage)_msg; + + _clone.parts = parts; + _clone.partsBytes = partsBytes; + _clone.topVer = topVer; + _clone.affAssignment = affAssignment; + _clone.affAssignmentBytes = affAssignmentBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 5: - if (!commState.putByteArray(affAssignmentBytes)) ++ if (!commState.putByteArray("affAssignmentBytes", affAssignmentBytes)) + return false; + + commState.idx++; + + case 6: - if (!commState.putByteArray(partsBytes)) ++ if (!commState.putByteArray("partsBytes", partsBytes)) + return false; + + commState.idx++; + + case 7: - if (!commState.putLong(topVer)) ++ if (!commState.putLong("topVer", topVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 5: - byte[] affAssignmentBytes0 = commState.getByteArray(); ++ affAssignmentBytes = commState.getByteArray("affAssignmentBytes"); + - if (affAssignmentBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - affAssignmentBytes = affAssignmentBytes0; - + commState.idx++; + + case 6: - byte[] partsBytes0 = commState.getByteArray(); ++ partsBytes = commState.getByteArray("partsBytes"); + - if (partsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - partsBytes = partsBytes0; - + commState.idx++; + + case 7: - if (buf.remaining() < 8) - return false; ++ topVer = commState.getLong("topVer"); + - topVer = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 45; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null ? parts.size() : 0, + "super", super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 0000000,00fef2b..823f1f6 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@@ -1,0 -1,177 +1,175 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Information about partitions of a single node. + */ + public class GridDhtPartitionsSingleMessage<K, V> extends GridDhtPartitionsAbstractMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Local partitions. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, GridDhtPartitionMap> parts = new HashMap<>(); + + /** Serialized partitions. */ + private byte[] partsBytes; + + /** + * Required by {@link Externalizable}. + */ + public GridDhtPartitionsSingleMessage() { + // No-op. + } + + /** + * @param exchId Exchange ID. + * @param lastVer Last version. + */ + public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) { + super(exchId, lastVer); + } + + /** + * Adds partition map to this message. + * + * @param cacheId Cache ID to add local partition for. + * @param locMap Local partition map. + */ + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap) { + parts.put(cacheId, locMap); + } + + /** + * @return Local partitions. + */ + public Map<Integer, GridDhtPartitionMap> partitions() { + return parts; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (parts != null) + partsBytes = ctx.marshaller().marshal(parts); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (partsBytes != null) + parts = ctx.marshaller().unmarshal(partsBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtPartitionsSingleMessage _clone = new GridDhtPartitionsSingleMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtPartitionsSingleMessage _clone = (GridDhtPartitionsSingleMessage)_msg; + + _clone.parts = parts; + _clone.partsBytes = partsBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 5: - if (!commState.putByteArray(partsBytes)) ++ if (!commState.putByteArray("partsBytes", partsBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 5: - byte[] partsBytes0 = commState.getByteArray(); ++ partsBytes = commState.getByteArray("partsBytes"); + - if (partsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - partsBytes = partsBytes0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 46; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionsSingleMessage.class, this, super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index 0000000,01aeb46..d3fbe40 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@@ -1,0 -1,101 +1,101 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + + /** + * Request for single partition info. + */ + public class GridDhtPartitionsSingleRequest<K, V> extends GridDhtPartitionsAbstractMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Required by {@link Externalizable}. + */ + public GridDhtPartitionsSingleRequest() { + // No-op. + } + + /** + * @param id Exchange ID. + */ + GridDhtPartitionsSingleRequest(GridDhtPartitionExchangeId id) { + super(id, null); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtPartitionsSingleRequest _clone = new GridDhtPartitionsSingleRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 47; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 0000000,8934704..2a51d59 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@@ -1,0 -1,597 +1,588 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Get request. + */ + public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable, + GridCacheVersionable { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Sub ID. */ + private IgniteUuid miniId; + + /** Version. */ + private GridCacheVersion ver; + + /** */ + @GridToStringInclude + @GridDirectTransient + private LinkedHashMap<K, Boolean> keys; + + /** Reload flag. */ + private boolean reload; + + /** Read through flag. */ + private boolean readThrough; + + /** */ + @GridToStringExclude + @GridDirectMap(keyType = byte[].class, valueType = boolean.class) + private LinkedHashMap<byte[], Boolean> keyBytes; + + /** Filter bytes. */ + private byte[][] filterBytes; + + /** Topology version. */ + private long topVer; + + /** Filters. */ + @GridDirectTransient + private IgnitePredicate<CacheEntry<K, V>>[] filter; + + /** Subject ID. */ + @GridDirectVersion(1) + private UUID subjId; + + /** Task name hash. */ + @GridDirectVersion(2) + private int taskNameHash; + + /** TTL for read operation. */ + private long accessTtl; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridNearGetRequest() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param futId Future ID. + * @param miniId Sub ID. + * @param ver Version. + * @param keys Keys. + * @param readThrough Read through flag. + * @param reload Reload flag. + * @param topVer Topology version. + * @param filter Filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. + */ + public GridNearGetRequest( + int cacheId, + IgniteUuid futId, + IgniteUuid miniId, + GridCacheVersion ver, + LinkedHashMap<K, Boolean> keys, + boolean readThrough, + boolean reload, + long topVer, + IgnitePredicate<CacheEntry<K, V>>[] filter, + UUID subjId, + int taskNameHash, + long accessTtl + ) { + assert futId != null; + assert miniId != null; + assert ver != null; + assert keys != null; + + this.cacheId = cacheId; + this.futId = futId; + this.miniId = miniId; + this.ver = ver; + this.keys = keys; + this.readThrough = readThrough; + this.reload = reload; + this.topVer = topVer; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Sub ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return Subject ID. + */ + public UUID subjectId() { + return subjId; + } + + /** + * Gets task name hash. + * + * @return Task name hash. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** + * @return Keys + */ + public LinkedHashMap<K, Boolean> keys() { + return keys; + } + + /** + * @return Reload flag. + */ + public boolean reload() { + return reload; + } + + /** + * @return Read through flag. + */ + public boolean readThrough() { + return readThrough; + } + + /** + * @return Topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** + * @return Filters. + */ + public IgnitePredicate<CacheEntry<K, V>>[] filter() { + return filter; + } + + /** + * @return New TTL to set after entry is accessed, -1 to leave unchanged. + */ + public long accessTtl() { + return accessTtl; + } + + /** + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + assert ctx != null; + assert !F.isEmpty(keys); + + if (keyBytes == null) + keyBytes = marshalBooleanLinkedMap(keys, ctx); + + if (filterBytes == null) + filterBytes = marshalFilter(filter, ctx); + } + + /** + * @param ctx Context. + * @param ldr Loader. + * @throws IgniteCheckedException If failed. + */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (keys == null) + keys = unmarshalBooleanLinkedMap(keyBytes, ctx, ldr); + + if (filter == null && filterBytes != null) + filter = unmarshalFilter(filterBytes, ctx, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridNearGetRequest _clone = new GridNearGetRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridNearGetRequest _clone = (GridNearGetRequest)_msg; + + _clone.futId = futId; + _clone.miniId = miniId; + _clone.ver = ver; + _clone.keys = keys; + _clone.reload = reload; + _clone.readThrough = readThrough; + _clone.keyBytes = keyBytes; + _clone.filterBytes = filterBytes; + _clone.topVer = topVer; + _clone.filter = filter; + _clone.subjId = subjId; + _clone.taskNameHash = taskNameHash; + _clone.accessTtl = accessTtl; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (!commState.putLong(accessTtl)) + return false; + + commState.idx++; + + case 4: + if (filterBytes != null) { + if (commState.it == null) { - if (!commState.putInt(filterBytes.length)) ++ if (!commState.putInt(null, filterBytes.length)) + return false; + + commState.it = arrayIterator(filterBytes); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putByteArray((byte[])commState.cur)) ++ if (!commState.putByteArray(null, (byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 5: - if (!commState.putGridUuid(futId)) ++ if (!commState.putGridUuid(null, futId)) + return false; + + commState.idx++; + + case 6: + if (keyBytes != null) { + if (commState.it == null) { - if (!commState.putInt(keyBytes.size())) ++ if (!commState.putInt(null, keyBytes.size())) + return false; + + commState.it = keyBytes.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<byte[], Boolean> e = (Map.Entry<byte[], Boolean>)commState.cur; + + if (!commState.keyDone) { - if (!commState.putByteArray(e.getKey())) ++ if (!commState.putByteArray(null, e.getKey())) + return false; + + commState.keyDone = true; + } + - if (!commState.putBoolean(e.getValue())) ++ if (!commState.putBoolean(null, e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 7: - if (!commState.putGridUuid(miniId)) ++ if (!commState.putGridUuid(null, miniId)) + return false; + + commState.idx++; + + case 8: - if (!commState.putBoolean(readThrough)) ++ if (!commState.putBoolean(null, readThrough)) + return false; + + commState.idx++; + + case 9: - if (!commState.putBoolean(reload)) ++ if (!commState.putBoolean(null, reload)) + return false; + + commState.idx++; + + case 10: - if (!commState.putLong(topVer)) ++ if (!commState.putLong(null, topVer)) + return false; + + commState.idx++; + + case 11: - if (!commState.putCacheVersion(ver)) ++ if (!commState.putCacheVersion(null, ver)) + return false; + + commState.idx++; + + case 12: - if (!commState.putUuid(subjId)) ++ if (!commState.putUuid(null, subjId)) + return false; + + commState.idx++; + + case 13: - if (!commState.putInt(taskNameHash)) ++ if (!commState.putInt(null, taskNameHash)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + if (buf.remaining() < 8) + return false; + + accessTtl = commState.getLong(); + + commState.idx++; + + case 4: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (filterBytes == null) + filterBytes = new byte[commState.readSize][]; + + for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); ++ byte[] _val = commState.getByteArray(null); + - if (_val == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + + filterBytes[i] = (byte[])_val; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 5: - IgniteUuid futId0 = commState.getGridUuid(); ++ IgniteUuid futId0 = commState.getGridUuid(null); + - if (futId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - futId = futId0; - + commState.idx++; + + case 6: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (keyBytes == null) + keyBytes = new LinkedHashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { - byte[] _val = commState.getByteArray(); ++ byte[] _val = commState.getByteArray(null); + - if (_val == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + - if (buf.remaining() < 1) - return false; ++ boolean _val = commState.getBoolean(null); + - boolean _val = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + keyBytes.put((byte[])commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 7: - IgniteUuid miniId0 = commState.getGridUuid(); ++ IgniteUuid miniId0 = commState.getGridUuid(null); + - if (miniId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - miniId = miniId0; - + commState.idx++; + + case 8: + if (buf.remaining() < 1) + return false; + - readThrough = commState.getBoolean(); ++ readThrough = commState.getBoolean(null); + + commState.idx++; + + case 9: + if (buf.remaining() < 1) + return false; + - reload = commState.getBoolean(); ++ reload = commState.getBoolean(null); + + commState.idx++; + + case 10: + if (buf.remaining() < 8) + return false; + - topVer = commState.getLong(); - + commState.idx++; + + case 11: - GridCacheVersion ver0 = commState.getCacheVersion(); ++ GridCacheVersion ver0 = commState.getCacheVersion(null); + - if (ver0 == CACHE_VER_NOT_READ) ++ if (!commState.lastRead()) + return false; + - ver = ver0; - + commState.idx++; + + case 12: - UUID subjId0 = commState.getUuid(); ++ UUID subjId0 = commState.getUuid(null); + - if (subjId0 == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - subjId = subjId0; - + commState.idx++; + + case 13: + if (buf.remaining() < 4) + return false; + - taskNameHash = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 48; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearGetRequest.class, this); + } + }