Reworked keys, vals, entryProcessors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d55671d5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d55671d5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d55671d5 Branch: refs/heads/ignite-2523-1 Commit: d55671d50c1a5b40710eb471240f86d0eefb1571 Parents: 92a311d Author: vozerov-gridgain <[email protected]> Authored: Tue Apr 19 15:17:04 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 19 15:17:04 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMessage.java | 48 +++++++++++ .../GridNearAtomicSingleUpdateRequest.java | 88 ++++++++------------ 2 files changed, 83 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d55671d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 3c2ff13..84a14f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -463,6 +463,24 @@ public abstract class GridCacheMessage implements Message { } /** + * @param obj Object to marshal. + * @param ctx Context. + * @return Marshaled object. + * @throws IgniteCheckedException If failed. + */ + protected byte[] marshal(Object obj, GridCacheContext ctx) throws IgniteCheckedException { + assert ctx != null; + + if (obj == null) + return null; + + if (addDepInfo) + prepareObject(obj, ctx); + + return CU.marshal(ctx, obj); + } + + /** * @param col Collection to marshal. * @param ctx Context. * @return Marshalled collection. @@ -539,6 +557,19 @@ public abstract class GridCacheMessage implements Message { } /** + * @param obj Object. + * @param ctx Context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + protected final void finishUnmarshalCacheObject(CacheObject obj, GridCacheContext ctx, ClassLoader ldr) + throws IgniteCheckedException { + if (obj != null) + obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + } + + /** * @param col Collection. * @param ctx Context. * @param ldr Class loader. @@ -589,6 +620,23 @@ public abstract class GridCacheMessage implements Message { } /** + * @param bytes Object to unmarshal. + * @param ctx Context. + * @param ldr Loader. + * @return Unmarshalled collection. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected <T> T unmarshal(@Nullable byte[] bytes, + GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + assert ldr != null; + assert ctx != null; + + Marshaller marsh = ctx.marshaller(); + + return bytes == null ? null : marsh.<T>unmarshal(bytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + /** * @param byteCol Collection to unmarshal. * @param ctx Context. * @param ldr Loader. http://git-wip-us.apache.org/repos/asf/ignite/blob/d55671d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 381f67c..2dede00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -45,8 +44,8 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -86,22 +85,18 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** Update operation. */ private GridCacheOperation op; - /** Keys to update. */ + /** Key to update. */ @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List<KeyCacheObject> keys; + private KeyCacheObject key; - /** Values to update. */ - @GridDirectCollection(CacheObject.class) - private List<CacheObject> vals; + /** Value to update. */ + private CacheObject val; - /** Entry processors. */ - @GridDirectTransient - private List<EntryProcessor<Object, Object, Object>> entryProcessors; + /** Entry processor. */ + private EntryProcessor<Object, Object, Object> entryProcessor; - /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> entryProcessorsBytes; + /** Entry processor bytes. */ + private byte[] entryProcessorBytes; /** Optional arguments for entry processor. */ @GridDirectTransient @@ -218,8 +213,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd this.keepBinary = keepBinary; this.clientReq = clientReq; this.addDepInfo = addDepInfo; - - keys = new ArrayList<>(1); } /** {@inheritDoc} */ @@ -324,21 +317,15 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd assert val != null || op == DELETE; - keys.add(key); + this.key = key; if (entryProcessor != null) { - if (entryProcessors == null) - entryProcessors = new ArrayList<>(1); - - entryProcessors.add(entryProcessor); + this.entryProcessor = entryProcessor; } else if (val != null) { assert val instanceof CacheObject : val; - if (vals == null) - vals = new ArrayList<>(1); - - vals.add((CacheObject)val); + this.val = (CacheObject)val; } hasPrimary |= primary; @@ -346,17 +333,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** {@inheritDoc} */ @Override public List<KeyCacheObject> keys() { - return keys; + return Collections.singletonList(key); } /** {@inheritDoc} */ @Override public int keysCount() { - return keys.size(); + return 1; } /** {@inheritDoc} */ @Override public List<?> values() { - return op == TRANSFORM ? entryProcessors : vals; + return op == TRANSFORM ? Collections.singletonList(entryProcessor) : Collections.singletonList(val); } /** {@inheritDoc} */ @@ -372,23 +359,24 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** {@inheritDoc} */ @Override public CacheObject value(int idx) { assert op == UPDATE : op; + assert idx == 0; - return vals.get(idx); + return val; } /** {@inheritDoc} */ @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { assert op == TRANSFORM : op; + assert idx == 0; - return entryProcessors.get(idx); + return entryProcessor; } /** {@inheritDoc} */ @Override public CacheObject writeValue(int idx) { - if (vals != null) - return vals.get(idx); + assert idx == 0; - return null; + return val; } /** {@inheritDoc} */ @@ -439,7 +427,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(keys, cctx); + prepareMarshalCacheObject(key, cctx); if (filter != null) { boolean hasFilter = false; @@ -464,14 +452,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd if (!addDepInfo && ctx.deploymentEnabled()) addDepInfo = true; - if (entryProcessorsBytes == null) - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + if (entryProcessorBytes == null) + entryProcessorBytes = marshal(entryProcessor, cctx); if (invokeArgsBytes == null) invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); } else - prepareMarshalCacheObjects(vals, cctx); + prepareMarshalCacheObject(val, cctx); } /** {@inheritDoc} */ @@ -480,17 +468,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd GridCacheContext cctx = ctx.cacheContext(cacheId); - finishUnmarshalCacheObjects(keys, cctx, ldr); + finishUnmarshalCacheObject(key, cctx, ldr); if (op == TRANSFORM) { - if (entryProcessors == null) - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + if (entryProcessor == null) + entryProcessor = unmarshal(entryProcessorBytes, ctx, ldr); if (invokeArgs == null) invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); } else - finishUnmarshalCacheObjects(vals, cctx, ldr); + finishUnmarshalCacheObject(val, cctx, ldr); if (filter != null) { for (CacheEntryPredicate p : filter) { @@ -530,7 +518,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd writer.incrementState(); case 4: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes)) return false; writer.incrementState(); @@ -578,7 +566,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd writer.incrementState(); case 12: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); @@ -638,7 +626,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd writer.incrementState(); case 22: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("val", val)) return false; writer.incrementState(); @@ -668,7 +656,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 4: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + entryProcessorBytes = reader.readByteArray("entryProcessorBytes"); if (!reader.isLastRead()) return false; @@ -732,7 +720,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 12: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + key = reader.readMessage("key"); if (!reader.isLastRead()) return false; @@ -820,7 +808,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 22: - vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + val = reader.readMessage("val"); if (!reader.isLastRead()) return false; @@ -834,14 +822,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** {@inheritDoc} */ @Override public void cleanup(boolean clearKeys) { - vals = null; - entryProcessors = null; - entryProcessorsBytes = null; invokeArgs = null; invokeArgsBytes = null; - - if (clearKeys) - keys = null; } /** {@inheritDoc} */
