Optimized DHT future - less collection creation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed96cfcb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed96cfcb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed96cfcb Branch: refs/heads/ignite-2523-1-resp-dht Commit: ed96cfcb051eafef3c0f939ee854fdb065c68309 Parents: 31604dd Author: vozerov-gridgain <[email protected]> Authored: Thu Apr 28 17:12:57 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Apr 28 17:12:57 2016 +0300 ---------------------------------------------------------------------- .../GridDhtAtomicAbstractUpdateFuture.java | 79 +++++++++++++++----- 1 file changed, 60 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ed96cfcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 3fc36cb..8d23e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -35,13 +35,13 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import javax.cache.processor.EntryProcessor; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -91,14 +91,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Response count. */ protected volatile int resCnt; - // TODO: Optimize. + /** Node for single mapping. */ + private UUID mapSingleNode; + + /** Request for single mapping. */ + private GridDhtAtomicUpdateRequest mapSingleReq; + /** Mappings. */ @GridToStringInclude private Map<UUID, GridDhtAtomicUpdateRequest> mappings; - // TODO: Optimize. + /** Continuous query closure. */ + private CI1<Boolean> cntQryClo; + /** Continuous query closures. */ - private Collection<CI1<Boolean>> cntQryClsrs; + private List<CI1<Boolean>> cntQryClos; /** * Constructor. @@ -127,8 +134,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte this.writeVer = writeVer; waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); - - mappings = U.newHashMap(updateReq.keysCount()); } /** @@ -356,23 +361,36 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * Internal mapping routine. */ private void mapAll() { - for (GridDhtAtomicUpdateRequest req : mappings.values()) - sendRequest(req); + if (mapSingleReq != null) + sendRequest(mapSingleReq); + else { + for (GridDhtAtomicUpdateRequest req : mappings.values()) + sendRequest(req); + } } /** - * @param clsr Continuous query closure. + * @param clo Continuous query closure. */ - public void addContinuousQueryClosure(CI1<Boolean> clsr){ + public void addContinuousQueryClosure(CI1<Boolean> clo){ assert !isDone() : this; - if (cntQryClsrs == null) - cntQryClsrs = new ArrayList<>(10); + if (cntQryClo != null) { + cntQryClos = new ArrayList<>(2); - cntQryClsrs.add(clsr); + cntQryClos.add(cntQryClo); + + cntQryClo = null; + } + + if (cntQryClos != null) + cntQryClos.add(clo); + else + cntQryClo = clo; } /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { cctx.mvcc().removeAtomicFuture(version()); @@ -382,9 +400,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte if (!suc) markAllKeysFailed(err); - if (cntQryClsrs != null) { - for (CI1<Boolean> clsr : cntQryClsrs) - clsr.apply(suc); + if (cntQryClo != null) + cntQryClo.apply(suc); + else if (cntQryClos != null) { + for (int i = 0; i < cntQryClos.size(); i++) + cntQryClos.get(i).apply(suc); } if (updateReq.writeSynchronizationMode() == FULL_SYNC) @@ -417,7 +437,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param req Request. */ private void mapping(UUID nodeId, GridDhtAtomicUpdateRequest req) { - mappings.put(nodeId, req); + if (mapSingleNode != null) { + mappings = U.newHashMap(updateReq.keysCount()); + + mappings.put(mapSingleNode, mapSingleReq); + + mapSingleNode = null; + mapSingleReq = null; + } + + if (mappings != null) + mappings.put(nodeId, req); + else { + mapSingleNode = nodeId; + mapSingleReq = req; + } } /** @@ -427,14 +461,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @return Mapping (if any). */ @Nullable protected GridDhtAtomicUpdateRequest mapping(UUID nodeId) { - return mappings.get(nodeId); + if (mapSingleNode != null) { + if (F.eq(mapSingleNode, nodeId)) + return mapSingleReq; + } + else if (mappings != null) + return mappings.get(nodeId); + + return null; } /** * @return Mappings number. */ protected int mappingsCount() { - return mappings != null ? mappings.size() : 0; + return mapSingleNode != null ? 1 : mappings != null ? mappings.size() : 0; } /**
