Repository: ignite Updated Branches: refs/heads/ignite-4705 7287a9368 -> eef1d3108
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 5dfea79..9da6b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -177,6 +177,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque int cacheId, UUID nodeId, long futId, + UUID nearNodeId, + long nearFutId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -188,7 +190,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque boolean keepBinary, boolean skipStore ) { - super(cacheId, nodeId); + super(cacheId, nodeId, nearNodeId, nearFutId); this.futId = futId; this.writeVer = writeVer; @@ -610,139 +612,139 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } switch (writer.state()) { - case 3: + case 6: if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); - case 4: + case 7: if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 5: + case 8: if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 6: + case 9: if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 7: + case 10: if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) return false; writer.incrementState(); - case 8: + case 11: if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); - case 9: + case 12: if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 10: + case 13: if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); - case 11: + case 14: if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 12: + case 15: if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 13: + case 16: if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); - case 14: + case 17: if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 15: + case 18: if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); - case 16: + case 19: if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 17: + case 20: if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 18: + case 21: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 19: + case 22: if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); - case 20: + case 23: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 21: + case 24: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 22: + case 25: if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); - case 23: + case 26: if (!writer.writeMessage("updateCntrs", updateCntrs)) return false; writer.incrementState(); - case 24: + case 27: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 25: + case 28: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -764,7 +766,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque return false; switch (reader.state()) { - case 3: + case 6: conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) @@ -772,7 +774,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 4: + case 7: conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -780,7 +782,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 5: + case 8: entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -788,7 +790,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 6: + case 9: flags = reader.readByte("flags"); if (!reader.isLastRead()) @@ -796,7 +798,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 7: + case 10: forceTransformBackups = reader.readBoolean("forceTransformBackups"); if (!reader.isLastRead()) @@ -804,7 +806,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 8: + case 11: futId = reader.readLong("futId"); if (!reader.isLastRead()) @@ -812,7 +814,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 9: + case 12: invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) @@ -820,7 +822,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 10: + case 13: keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) @@ -828,7 +830,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 11: + case 14: keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -836,7 +838,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 12: + case 15: nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -844,7 +846,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 13: + case 16: nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) @@ -852,7 +854,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 14: + case 17: nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -860,7 +862,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 15: + case 18: nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) @@ -868,7 +870,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 16: + case 19: nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -876,7 +878,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 17: + case 20: prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -884,7 +886,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 18: + case 21: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -892,7 +894,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 19: + case 22: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -904,7 +906,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 20: + case 23: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -912,7 +914,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 21: + case 24: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -920,7 +922,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 22: + case 25: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -928,7 +930,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 23: + case 26: updateCntrs = reader.readMessage("updateCntrs"); if (!reader.isLastRead()) @@ -936,7 +938,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 24: + case 27: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -944,7 +946,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 25: + case 28: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -970,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; + return 29; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 76d28c9..c803d1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -156,6 +156,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri nearEvicted.add(key); } + /** + * @param nearEvicted Evicted near cache keys. + */ + void nearEvicted(List<KeyCacheObject> nearEvicted) { + this.nearEvicted = nearEvicted; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 42f4bc3..82f171d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -299,7 +298,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt */ public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr); - public abstract void onResult(UUID nodeId, GridNearAtomicDhtResponse res); + public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res); /** * @param req Request. http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java deleted file mode 100644 index fc99a5b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * TODO IGNITE-4705: no need send mapping if it == affinity. - */ -public class GridNearAtomicDhtResponse extends GridCacheMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** */ - private static final int HAS_RESULT_MASK = 0x1; - - /** */ - private static final int RESULT_SUCCESS_MASK = 0x2; - - /** */ - private long futId; - - /** */ - @GridDirectCollection(UUID.class) - private List<UUID> mapping; - - /** */ - private byte flags; - - /** - * - */ - public GridNearAtomicDhtResponse() { - // No-op. - } - - /** - * @param futId Future ID. - * @param mapping Update mapping. - */ - public GridNearAtomicDhtResponse(long futId, List<UUID> mapping) { - this.futId = futId; - this.mapping = mapping; - } - - /** - * @param success Success flag. - */ - public void setResult(boolean success) { - setFlag(true, HAS_RESULT_MASK); - - setFlag(success, RESULT_SUCCESS_MASK); - } - - /** - * @return Operation result. - */ - public GridCacheReturn result() { - assert hasResult(); - - return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK)); - } - - /** - * @return {@code True} if response contains operation result. - */ - public boolean hasResult() { - return isFlag(HAS_RESULT_MASK); - } - - /** - * @return Update mapping. - */ - public List<UUID> mapping() { - return mapping; - } - - /** - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); - } - - /** - * Reads flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; - } - - /** - * @return Future ID. - */ - public long futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return -45; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 6; - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeLong("futId", futId)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - futId = reader.readLong("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridNearAtomicDhtResponse.class); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 8c3a364..2016c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -69,6 +71,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private GridNearAtomicAbstractUpdateRequest req; /** */ + private Set<UUID> rcvd; + + /** */ private Set<UUID> mapping; /** @@ -130,6 +135,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicUpdateResponse res = null; GridNearAtomicAbstractUpdateRequest req; + GridCacheReturn opRes0 = null; synchronized (mux) { req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null; @@ -147,6 +153,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda res.addFailedKeys(req.keys(), e); } + else { + if (mapping != null && mapping.remove(nodeId)) { + if (mapping.isEmpty() && opRes != null) + opRes0 = opRes; + } + } } if (res != null) { @@ -158,6 +170,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda onResult(nodeId, res, true); } + else if (opRes0 != null) + onDone(opRes0); return false; } @@ -193,16 +207,44 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return false; } + /** + * @param nodeIds DHT nodes. + */ + private void initMapping(List<UUID> nodeIds) { + mapping = U.newHashSet(nodeIds.size()); + + for (UUID dhtNodeId : nodeIds) { + if (cctx.discovery().node(dhtNodeId) != null) + mapping.add(dhtNodeId); + } + } + /** {@inheritDoc} */ - @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) { + @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) { GridCacheReturn opRes0 = null; synchronized (mux) { if (futId == null || futId != res.futureId()) return; - if (mapping == null) - mapping = new HashSet<>(res.mapping()); + if (res.mapping() != null) { + // Mapping is sent from dht nodes. + if (mapping == null) + initMapping(res.mapping()); + } + else { + // Mapping and result are sent from primary. + if (mapping == null) { + if (rcvd == null) + rcvd = new HashSet<>(); + + rcvd.add(nodeId); + + return; // Need wait for response from primary. + } + else + mapping.remove(nodeId); + } mapping.remove(nodeId); @@ -250,6 +292,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { + // TODO IGNITE-4705: assert only 1 key? if (res.failedKeys() != null) { if (err == null) err = new CachePartialUpdateCheckedException( @@ -280,6 +323,18 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } else opRes = ret; + + if (res.mapping() != null) { + initMapping(res.mapping()); + + if (rcvd != null) + mapping.removeAll(rcvd); + } + else + mapping = Collections.emptySet(); + + if (!mapping.isEmpty()) + return; } if (remapKey) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index c075f09..7b573b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -256,7 +256,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) { + @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index b089193..f6c2a2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -105,6 +105,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Near expire times. */ private GridLongList nearExpireTimes; + /** */ + @GridDirectCollection(UUID.class) + private List<UUID> mapping; + /** * Empty constructor required by {@link Externalizable}. */ @@ -125,6 +129,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr this.addDepInfo = addDepInfo; } + /** + * @return Update mapping. + */ + public List<UUID> mapping() { + return mapping; + } + + /** + * @param mapping Mapping. + */ + public void mapping(List<UUID> mapping) { + this.mapping = mapping; + } + /** {@inheritDoc} */ @Override public int lookupIndex() { return CACHE_MSG_IDX; @@ -472,48 +490,54 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 6: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) + if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); case 8: - if (!writer.writeMessage("nearTtls", nearTtls)) + if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 9: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); case 10: - if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("nearVer", nearVer)) + if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 12: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearVer", nearVer)) return false; writer.incrementState(); case 13: + if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 14: if (!writer.writeMessage("ret", ret)) return false; @@ -560,7 +584,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 6: - nearExpireTimes = reader.readMessage("nearExpireTimes"); + mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -568,7 +592,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 7: - nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); + nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) return false; @@ -576,7 +600,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 8: - nearTtls = reader.readMessage("nearTtls"); + nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -584,7 +608,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 9: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) return false; @@ -592,7 +616,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 10: - nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -600,7 +624,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 11: - nearVer = reader.readMessage("nearVer"); + nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -608,7 +632,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 12: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + nearVer = reader.readMessage("nearVer"); if (!reader.isLastRead()) return false; @@ -616,6 +640,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 13: + remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: ret = reader.readMessage("ret"); if (!reader.isLastRead()) @@ -635,7 +667,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java new file mode 100644 index 0000000..106612c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class UpdateErrors implements Message { + /** Failed keys. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List<KeyCacheObject> failedKeys; + + /** Update error. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** Serialized update error. */ + private byte[] errBytes; + + /** + * + */ + public UpdateErrors() { + // No-op. + } + + /** + * @param err Error. + */ + public void onError(IgniteCheckedException err){ + this.err = err; + } + + /** + * @return Error. + */ + public IgniteCheckedException error() { + return err; + } + + /** + * @return Failed keys. + */ + public Collection<KeyCacheObject> failedKeys() { + return failedKeys; + } + + /** + * Adds key to collection of failed keys. + * + * @param key Key to add. + * @param e Error cause. + */ + public void addFailedKey(KeyCacheObject key, Throwable e) { + if (failedKeys == null) + failedKeys = new ArrayList<>(); + + failedKeys.add(key); + + if (err == null) + err = new IgniteCheckedException("Failed to update keys."); + + err.addSuppressed(e); + } + + /** {@inheritDoc} */ + void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException { + msg.prepareMarshalCacheObjects(failedKeys, cctx); + + if (errBytes == null) + errBytes = U.marshal(cctx.marshaller(), err); + } + + /** {@inheritDoc} */ + void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException { + msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr); + + if (errBytes != null && err == null) + err = U.unmarshal(cctx.marshaller(), errBytes, U.resolveClassLoader(ldr, cctx.gridConfig())); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(UpdateErrors.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -46; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 41632ef..f8ae661 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; @@ -299,11 +300,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Dht atomic update request. * @param res Dht atomic update response. + * @return Evicted near keys (if any). */ - public void processDhtAtomicUpdateRequest( + @Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest( UUID nodeId, GridDhtAtomicAbstractUpdateRequest req, - GridDhtAtomicUpdateResponse res + GridDhtAtomicNearResponse res ) { GridCacheVersion ver = req.writeVersion(); @@ -313,6 +315,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + List<KeyCacheObject> nearEvicted = null; + for (int i = 0; i < req.nearSize(); i++) { KeyCacheObject key = req.nearKey(i); @@ -322,7 +326,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheEntryEx entry = peekEx(key); if (entry == null) { - res.addNearEvicted(key); + if (nearEvicted == null) + nearEvicted = new ArrayList<>(); + + nearEvicted.add(key); break; } @@ -388,6 +395,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e)); } } + + return nearEvicted; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java index 40e563c..8d15e5e 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java @@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.yardstick.cache.model.SampleValue; /** - * Ignite benchmark that performs invoke operations. + * Ignite benchmark that performs getAndPut operations. */ public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java index 49ae985..0a3794c 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java @@ -27,7 +27,7 @@ import org.apache.ignite.yardstick.cache.model.SampleValue; import org.yardstickframework.BenchmarkConfiguration; /** - * Ignite benchmark that performs invoke operations. + * Ignite benchmark that performs getAndPut operations. */ public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { /** */
