#ignite-51: IgniteTxEntry implements Message: small fixs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/832657ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/832657ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/832657ef Branch: refs/heads/ignite-user-req Commit: 832657ef01740cfd34a3469573a7ada097c6cbef Parents: 21cdc80 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed Mar 4 12:47:34 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed Mar 4 12:47:34 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 2 +- .../near/GridNearTxPrepareFuture.java | 6 +- .../near/GridNearTxPrepareResponse.java | 18 +-- .../GridNearTxPrepareResponseOwnedValue.java | 157 ------------------- .../near/NearTxPrepareResponseOwnedValue.java | 157 +++++++++++++++++++ .../cache/transactions/IgniteTxEntry.java | 22 ++- .../cache/transactions/TxEntryValueHolder.java | 24 +-- 7 files changed, 189 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f12a9bf..44f8c7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -545,7 +545,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 99: - msg = new GridNearTxPrepareResponseOwnedValue(); + msg = new NearTxPrepareResponseOwnedValue(); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index dc2c11d..2a6cddb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -934,7 +934,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut else { assert F.isEmpty(res.invalidPartitions()); - for (Map.Entry<IgniteTxKey, GridNearTxPrepareResponseOwnedValue> entry : res.ownedValues().entrySet()) { + for (Map.Entry<IgniteTxKey, NearTxPrepareResponseOwnedValue> entry : res.ownedValues().entrySet()) { IgniteTxEntry txEntry = tx.entry(entry.getKey()); assert txEntry != null; @@ -946,7 +946,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut if (cacheCtx.isNear()) { GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); - GridNearTxPrepareResponseOwnedValue tup = entry.getValue(); + NearTxPrepareResponseOwnedValue tup = entry.getValue(); nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(), tup.version(), m.node().id()); @@ -954,7 +954,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut else if (txEntry.cached().detached()) { GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); - GridNearTxPrepareResponseOwnedValue tup = entry.getValue(); + NearTxPrepareResponseOwnedValue tup = entry.getValue(); detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index d6819bf..6cf80ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -61,7 +61,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** Map of owned values to set on near node. */ @GridToStringInclude @GridDirectTransient - private Map<IgniteTxKey, GridNearTxPrepareResponseOwnedValue> ownedVals; + private Map<IgniteTxKey, NearTxPrepareResponseOwnedValue> ownedVals; /** OwnedVals' keys for marshalling. */ @GridToStringExclude @@ -70,8 +70,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** OwnedVals' values for marshalling. */ @GridToStringExclude - @GridDirectCollection(GridNearTxPrepareResponseOwnedValue.class) - private Collection<GridNearTxPrepareResponseOwnedValue> ownedValVals; + @GridDirectCollection(NearTxPrepareResponseOwnedValue.class) + private Collection<NearTxPrepareResponseOwnedValue> ownedValVals; /** Cache return value. */ @GridDirectTransient @@ -174,7 +174,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse if (ownedVals == null) ownedVals = new HashMap<>(); - GridNearTxPrepareResponseOwnedValue oVal = new GridNearTxPrepareResponseOwnedValue(ver, val); + NearTxPrepareResponseOwnedValue oVal = new NearTxPrepareResponseOwnedValue(ver, val); ownedVals.put(key, oVal); } @@ -182,9 +182,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** * @return Owned values map. */ - public Map<IgniteTxKey, GridNearTxPrepareResponseOwnedValue> ownedValues() { + public Map<IgniteTxKey, NearTxPrepareResponseOwnedValue> ownedValues() { return ownedVals == null ? - Collections.<IgniteTxKey, GridNearTxPrepareResponseOwnedValue>emptyMap() : + Collections.<IgniteTxKey, NearTxPrepareResponseOwnedValue>emptyMap() : Collections.unmodifiableMap(ownedVals); } @@ -234,7 +234,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse ownedValVals = ownedVals.values(); - for (Map.Entry<IgniteTxKey, GridNearTxPrepareResponseOwnedValue> entry : ownedVals.entrySet()) { + for (Map.Entry<IgniteTxKey, NearTxPrepareResponseOwnedValue> entry : ownedVals.entrySet()) { GridCacheContext cacheCtx = ctx.cacheContext(entry.getKey().cacheId()); entry.getKey().prepareMarshal(cacheCtx); @@ -266,14 +266,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse Iterator<IgniteTxKey> keyIter = ownedValKeys.iterator(); - Iterator<GridNearTxPrepareResponseOwnedValue> valueIter = ownedValVals.iterator(); + Iterator<NearTxPrepareResponseOwnedValue> valueIter = ownedValVals.iterator(); while (keyIter.hasNext()) { IgniteTxKey key = keyIter.next(); GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - GridNearTxPrepareResponseOwnedValue value = valueIter.next(); + NearTxPrepareResponseOwnedValue value = valueIter.next(); key.finishUnmarshal(cctx, ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponseOwnedValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponseOwnedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponseOwnedValue.java deleted file mode 100644 index ff0f35a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponseOwnedValue.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; - -/** - * Message for owned values to set on near node. - */ -public class GridNearTxPrepareResponseOwnedValue implements Message { - /** Cache version. */ - private GridCacheVersion vers; - - /** Cache object. */ - private CacheObject obj; - - /** */ - public GridNearTxPrepareResponseOwnedValue() { - // No-op. - } - - /** - * @param vers Cache version. - * @param obj Cache object. - */ - GridNearTxPrepareResponseOwnedValue(GridCacheVersion vers, CacheObject obj) { - this.vers = vers; - this.obj = obj; - } - - /** - * @return Cache version. - */ - public GridCacheVersion version() { - return vers; - } - - /** - * @return Cache object. - */ - public CacheObject cacheObject() { - return obj; - } - - /** - * This method is called before the whole message is sent - * and is responsible for pre-marshalling state. - * - * @param ctx Cache object context. - * @throws org.apache.ignite.IgniteCheckedException If failed. - */ - public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { - if (obj != null) - obj.prepareMarshal(ctx); - } - - /** - * This method is called after the whole message is recived - * and is responsible for unmarshalling state. - * - * @param ctx Context. - * @param ldr Class loader. - * @throws org.apache.ignite.IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (obj != null) - obj.finishUnmarshal(ctx, ldr); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeMessage("obj", obj)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeMessage("vers", vers)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - obj = reader.readMessage("obj"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - vers = reader.readMessage("vers"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 99; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxPrepareResponseOwnedValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxPrepareResponseOwnedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxPrepareResponseOwnedValue.java new file mode 100644 index 0000000..502b906 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxPrepareResponseOwnedValue.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * Message for owned values to set on near node. + */ +public class NearTxPrepareResponseOwnedValue implements Message { + /** Cache version. */ + private GridCacheVersion vers; + + /** Cache object. */ + private CacheObject obj; + + /** */ + public NearTxPrepareResponseOwnedValue() { + // No-op. + } + + /** + * @param vers Cache version. + * @param obj Cache object. + */ + NearTxPrepareResponseOwnedValue(GridCacheVersion vers, CacheObject obj) { + this.vers = vers; + this.obj = obj; + } + + /** + * @return Cache version. + */ + public GridCacheVersion version() { + return vers; + } + + /** + * @return Cache object. + */ + public CacheObject cacheObject() { + return obj; + } + + /** + * This method is called before the whole message is sent + * and is responsible for pre-marshalling state. + * + * @param ctx Cache object context. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + if (obj != null) + obj.prepareMarshal(ctx); + } + + /** + * This method is called after the whole message is recived + * and is responsible for unmarshalling state. + * + * @param ctx Context. + * @param ldr Class loader. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (obj != null) + obj.finishUnmarshal(ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("obj", obj)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("vers", vers)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + obj = reader.readMessage("obj"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + vers = reader.readMessage("vers"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 99; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 4ac2ab6..43ea4e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -100,7 +100,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private GridCacheVersion explicitVer; /** DHT version. */ - private transient volatile GridCacheVersion dhtVer; + @GridDirectTransient + private volatile GridCacheVersion dhtVer; /** Put filters. */ @GridToStringInclude @@ -112,23 +113,29 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private boolean filtersPassed; /** Flag indicating that filter is set and can not be replaced. */ - private transient boolean filtersSet; + @GridDirectTransient + private boolean filtersSet; /** Underlying cache entry. */ - private transient volatile GridCacheEntryEx entry; + @GridDirectTransient + private volatile GridCacheEntryEx entry; /** Cache registry. */ - private transient GridCacheContext<?, ?> ctx; + @GridDirectTransient + private GridCacheContext<?, ?> ctx; /** Prepared flag to prevent multiple candidate add. */ @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient AtomicBoolean prepared = new AtomicBoolean(); + @GridDirectTransient + private AtomicBoolean prepared = new AtomicBoolean(); /** Lock flag for colocated cache. */ + @GridDirectTransient private transient boolean locked; /** Assigned node ID (required only for partitioned cache). */ - private transient UUID nodeId; + @GridDirectTransient + private UUID nodeId; /** Flag if this node is a back up node. */ @GridDirectTransient @@ -142,6 +149,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private ExpiryPolicy expiryPlc; /** Expiry policy transfer flag. */ + @GridDirectTransient private boolean transferExpiryPlc; /** Expiry policy bytes. */ @@ -738,7 +746,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { val.unmarshal(this.ctx, clsLdr); - if (transferExpiryPlc && expiryPlcBytes != null) + if (expiryPlcBytes != null) expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832657ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java index cafe643..9627026 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java @@ -128,16 +128,6 @@ public class TxEntryValueHolder implements Message { throws IgniteCheckedException { if (hasWriteVal && val != null) val.prepareMarshal(ctx.cacheObjectContext()); - -// TODO IGNITE-51. -// boolean valIsByteArr = val != null && val instanceof byte[]; -// -// // Do not send write values to remote nodes. -// if (hasWriteVal && val != null && !valIsByteArr && valBytes == null && -// (depEnabled || !ctx.isUnmarshalValues())) -// valBytes = CU.marshal(sharedCtx, val); -// -// valBytesSent = hasWriteVal && !valIsByteArr && valBytes != null && (depEnabled || !ctx.isUnmarshalValues()); } /** @@ -148,10 +138,6 @@ public class TxEntryValueHolder implements Message { public void unmarshal(GridCacheContext<?, ?> ctx, ClassLoader ldr) throws IgniteCheckedException { if (hasWriteVal && val != null) val.finishUnmarshal(ctx, ldr); - -// TODO IGNITE-51. -// if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled)) -// val = ctx.marshaller().unmarshal(valBytes, ldr); } /** {@inheritDoc} */ @@ -184,7 +170,7 @@ public class TxEntryValueHolder implements Message { writer.incrementState(); case 2: - if (hasWriteVal && !writer.writeMessage("val", val)) + if (!writer.writeMessage("val", hasWriteVal ? val : null)) return false; writer.incrementState(); @@ -223,12 +209,10 @@ public class TxEntryValueHolder implements Message { reader.incrementState(); case 2: - if (hasWriteVal) { - val = reader.readMessage("val"); + val = reader.readMessage("val"); - if (!reader.isLastRead()) - return false; - } + if (!reader.isLastRead()) + return false; reader.incrementState();