Commenting out the RW locks.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8a0f445 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8a0f445 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8a0f445 Branch: refs/heads/ignite-perftest Commit: e8a0f4450fdec918c630ec9bbe88e41e60263800 Parents: 4f941d6 Author: Alexey Goncharuk <[email protected]> Authored: Tue Nov 10 14:56:36 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Nov 10 14:56:36 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 11 +- .../managers/communication/GridIoManager.java | 52 +- .../communication/GridIoMessageFactory.java | 6 + .../cache/GridCacheDeploymentManager.java | 2 +- .../processors/cache/GridCacheIoManager.java | 34 +- .../GridDistributedTxPrepareRequest.java | 59 ++- .../distributed/dht/GridDhtTxLocalAdapter.java | 9 + .../distributed/dht/GridDhtTxPrepareFuture.java | 3 +- .../dht/GridDhtTxPrepareRequest.java | 54 +- .../near/GridNearTxPrepareRequest.java | 50 +- .../internal/util/GridSpinReadWriteLock.java | 522 ++++++++++--------- .../internal/util/UUIDCollectionMessage.java | 111 ++++ .../ignite/internal/util/lang/GridFunc.java | 8 +- 13 files changed, 554 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 8d9a3f5..74c71c4 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -43,6 +43,10 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -165,7 +169,12 @@ public class MessageCodeGenerator { MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); - gen.generateAll(true); + gen.generateAndWrite(GridDistributedTxPrepareRequest.class); + gen.generateAndWrite(GridDhtTxPrepareRequest.class); + gen.generateAndWrite(GridNearTxPrepareRequest.class); + gen.generateAndWrite(UUIDCollectionMessage.class); + +// gen.generateAll(true); // gen.generateAndWrite(DataStreamerEntry.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b8af8da..a14a05a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -167,10 +167,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final Marshaller marsh; /** Busy lock. */ - private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); +// private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); /** Lock to sync maps access. */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); +// private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** Fully started flag. When set to true, can send and receive messages. */ private volatile boolean started; @@ -396,7 +396,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } // Clean up delayed and ordered messages (need exclusive lock). - lock.writeLock().lock(); +// lock.writeLock().lock(); try { ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId); @@ -406,7 +406,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa "(sender node left topology): " + waitList); } finally { - lock.writeLock().unlock(); +// lock.writeLock().unlock(); } break; @@ -424,7 +424,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // 1. Process wait list. Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>(); - lock.writeLock().lock(); +// lock.writeLock().lock(); try { started = true; @@ -442,7 +442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { - lock.writeLock().unlock(); +// lock.writeLock().unlock(); } // After write lock released. @@ -501,19 +501,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean interrupted = false; // Busy wait is intentional. - while (true) { - try { - if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) - break; - else - Thread.sleep(200); - } - catch (InterruptedException ignore) { - // Preserve interrupt status & ignore. - // Note that interrupted flag is cleared. - interrupted = true; - } - } +// while (true) { +// try { +// if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) +// break; +// else +// Thread.sleep(200); +// } +// catch (InterruptedException ignore) { +// // Preserve interrupt status & ignore. +// // Note that interrupted flag is cleared. +// interrupted = true; +// } +// } try { if (interrupted) @@ -529,7 +529,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa stopping = true; } finally { - busyLock.writeUnlock(); +// busyLock.writeUnlock(); } } @@ -553,7 +553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert nodeId != null; assert msg != null; - busyLock.readLock(); +// busyLock.readLock(); try { if (stopping) { @@ -581,7 +581,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } if (!started) { - lock.readLock().lock(); +// lock.readLock().lock(); try { if (!started) { // Sets to true in write lock, so double checking. @@ -601,7 +601,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { - lock.readLock().unlock(); +// lock.readLock().unlock(); } } @@ -649,7 +649,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa U.error(log, "Failed to process message (will ignore): " + msg, e); } finally { - busyLock.readUnlock(); +// busyLock.readUnlock(); } } @@ -2001,7 +2001,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - busyLock.readLock(); +// busyLock.readLock(); try { if (stopping) { @@ -2077,7 +2077,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { - busyLock.readUnlock(); +// busyLock.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index ae8c753..2503eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRe import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; import org.apache.ignite.internal.util.GridByteArrayList; import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -690,6 +691,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 115: + msg = new UUIDCollectionMessage(); + + break; + // [-3..114] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index 40c5b0f..35e8b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -164,7 +164,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap * Callback on method enter. */ public void onEnter() { - if (!locDepOwner && depEnabled && !ignoreOwnership.get() + if (depEnabled && !locDepOwner && !ignoreOwnership.get() && !cctx.kernalContext().job().internal()) { ClassLoader ldr = Thread.currentThread().getContextClassLoader(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 2334780..1a118a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -101,7 +101,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { private boolean stopping; /** Mutex. */ - private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); +// private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); /** Deployment enabled. */ private boolean depEnabled; @@ -218,19 +218,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { boolean interrupted = false; // Busy wait is intentional. - while (true) { - try { - if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) - break; - else - Thread.sleep(200); - } - catch (InterruptedException ignore) { - // Preserve interrupt status & ignore. - // Note that interrupted flag is cleared. - interrupted = true; - } - } +// while (true) { +// try { +// if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) +// break; +// else +// Thread.sleep(200); +// } +// catch (InterruptedException ignore) { +// // Preserve interrupt status & ignore. +// // Note that interrupted flag is cleared. +// interrupted = true; +// } +// } if (interrupted) Thread.currentThread().interrupt(); @@ -239,7 +239,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { stopping = true; } finally { - rw.writeUnlock(); +// rw.writeUnlock(); } } @@ -251,7 +251,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg, final IgniteBiInClosure<UUID, GridCacheMessage> c) { - rw.readLock(); +// rw.readLock(); try { if (stopping) { @@ -282,7 +282,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled) cctx.deploy().ignoreOwnership(false); - rw.readUnlock(); +// rw.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 533c8ca..ba251e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -34,8 +35,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -52,6 +56,20 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** */ private static final long serialVersionUID = 0L; + /** Collection to message converter. */ + public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { + @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) { + return new UUIDCollectionMessage(uuids); + } + }; + + /** Message to collection converter. */ + public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() { + @Override public Collection<UUID> apply(UUIDCollectionMessage msg) { + return msg.uuids(); + } + }; + /** Thread ID. */ @GridToStringInclude private long threadId; @@ -106,6 +124,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectTransient private Map<UUID, Collection<UUID>> txNodes; + /** Tx nodes direct marshallable message. */ + @GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class) + private Map<UUID, UUIDCollectionMessage> txNodesMsg; + /** */ private byte[] txNodesBytes; @@ -302,8 +324,12 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage dhtVerVals = dhtVers.values(); } - if (txNodes != null) - txNodesBytes = ctx.marshaller().marshal(txNodes); + if (txNodesMsg == null) + txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG); + + // TODO backward compatibility. +// if (txNodes != null) +// txNodesBytes = ctx.marshaller().marshal(txNodes); } /** {@inheritDoc} */ @@ -334,7 +360,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } } - if (txNodesBytes != null) + if (txNodesMsg != null) + txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL); + + if (txNodesBytes != null && txNodes == null) txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); } @@ -431,18 +460,24 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 19: - if (!writer.writeInt("txSize", txSize)) + if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("writeVer", writeVer)) + if (!writer.writeInt("txSize", txSize)) return false; writer.incrementState(); case 21: + if (!writer.writeMessage("writeVer", writeVer)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG)) return false; @@ -569,7 +604,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 19: - txSize = reader.readInt("txSize"); + txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -577,7 +612,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 20: - writeVer = reader.readMessage("writeVer"); + txSize = reader.readInt("txSize"); if (!reader.isLastRead()) return false; @@ -585,6 +620,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 21: + writeVer = reader.readMessage("writeVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: writes = reader.readCollection("writes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -604,7 +647,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 6de8795..951c18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -185,6 +185,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * Gets flag that indicates that originating node has a near cache that participates in this transaction. + * + * @return Has near cache flag. + */ + public boolean nearOnOriginatingNode() { + return nearOnOriginatingNode; + } + + /** * @return {@code True} if explicit lock transaction. */ public boolean explicitLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1d6f633..6cf2dea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -699,7 +699,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.activeCachesDeploymentEnabled()); if (prepErr == null) { - addDhtValues(res); + if (tx.needReturnValue() || tx.nearOnOriginatingNode()) + addDhtValues(res); GridCacheVersion min = tx.minVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index fcd66c2..394ff89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -345,79 +345,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 22: + case 23: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 23: + case 24: if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); - case 24: + case 25: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 25: + case 26: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 26: + case 27: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 27: + case 28: if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 28: + case 29: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 29: + case 30: if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 30: + case 31: if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 31: + case 32: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 32: + case 33: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 33: + case 34: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 34: + case 35: if (!writer.writeMessage("topVer", topVer)) return false; @@ -439,7 +439,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 22: + case 23: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -447,7 +447,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 23: + case 24: invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) @@ -455,7 +455,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 24: + case 25: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -463,7 +463,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 26: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -471,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 27: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 28: nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 29: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -495,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 30: ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -503,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 31: ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -511,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 32: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -519,7 +519,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 33: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -527,7 +527,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 34: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -535,7 +535,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 35: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -555,6 +555,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 35; + return 36; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 456d726..5a006d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -291,73 +291,73 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 22: + case 23: if (!writer.writeBoolean("explicitLock", explicitLock)) return false; writer.incrementState(); - case 23: + case 24: if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); - case 24: + case 25: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 25: + case 26: if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); - case 26: + case 27: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 27: + case 28: if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); - case 28: + case 29: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 29: + case 30: if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); - case 30: + case 31: if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); - case 31: + case 32: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 32: + case 33: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 33: + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -379,7 +379,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 22: + case 23: explicitLock = reader.readBoolean("explicitLock"); if (!reader.isLastRead()) @@ -387,7 +387,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 23: + case 24: firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) @@ -395,7 +395,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 24: + case 25: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -403,7 +403,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 26: implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) @@ -411,7 +411,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 27: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -419,7 +419,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 28: lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) @@ -427,7 +427,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 29: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -435,7 +435,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 30: near = reader.readBoolean("near"); if (!reader.isLastRead()) @@ -443,7 +443,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 31: retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) @@ -451,7 +451,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 32: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -459,7 +459,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 33: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -467,7 +467,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -487,7 +487,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java index a1fa892..115fd80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java @@ -78,44 +78,44 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public void readLock() { - int cnt = readLockEntryCnt.get(); - - // Read lock reentry or acquiring read lock while holding write lock. - if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { - assert state > 0 || state == -1; - - readLockEntryCnt.set(cnt + 1); - - return; - } - - boolean interrupted = false; - - while (true) { - int cur = state; - - assert cur >= -1; - - if (cur == -1 || pendingWLocks > 0) { - try { - Thread.sleep(10); - } - catch (InterruptedException ignored) { - interrupted = true; - } - - continue; - } - - if (compareAndSet(STATE_OFFS, cur, cur + 1)) { - if (interrupted) - Thread.currentThread().interrupt(); - - break; - } - } - - readLockEntryCnt.set(1); +// int cnt = readLockEntryCnt.get(); +// +// // Read lock reentry or acquiring read lock while holding write lock. +// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { +// assert state > 0 || state == -1; +// +// readLockEntryCnt.set(cnt + 1); +// +// return; +// } +// +// boolean interrupted = false; +// +// while (true) { +// int cur = state; +// +// assert cur >= -1; +// +// if (cur == -1 || pendingWLocks > 0) { +// try { +// Thread.sleep(10); +// } +// catch (InterruptedException ignored) { +// interrupted = true; +// } +// +// continue; +// } +// +// if (compareAndSet(STATE_OFFS, cur, cur + 1)) { +// if (interrupted) +// Thread.currentThread().interrupt(); +// +// break; +// } +// } +// +// readLockEntryCnt.set(1); } /** @@ -124,60 +124,62 @@ public class GridSpinReadWriteLock { * @return {@code true} if acquired. */ public boolean tryReadLock() { - int cnt = readLockEntryCnt.get(); - - // Read lock reentry or acquiring read lock while holding write lock. - if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { - assert state > 0 || state == -1; - - readLockEntryCnt.set(cnt + 1); - - return true; - } - - while (true) { - int cur = state; - - if (cur == -1 || pendingWLocks > 0) - return false; - - if (compareAndSet(STATE_OFFS, cur, cur + 1)) { - readLockEntryCnt.set(1); - - return true; - } - } +// int cnt = readLockEntryCnt.get(); +// +// // Read lock reentry or acquiring read lock while holding write lock. +// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { +// assert state > 0 || state == -1; +// +// readLockEntryCnt.set(cnt + 1); +// +// return true; +// } +// +// while (true) { +// int cur = state; +// +// if (cur == -1 || pendingWLocks > 0) +// return false; +// +// if (compareAndSet(STATE_OFFS, cur, cur + 1)) { +// readLockEntryCnt.set(1); +// +// return true; +// } +// } + + return true; } /** * Read unlock. */ public void readUnlock() { - int cnt = readLockEntryCnt.get(); - - if (cnt == 0) - throw new IllegalMonitorStateException(); - - // Read unlock when holding write lock is performed here. - if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) { - assert state > 0 || state == -1; - - readLockEntryCnt.set(cnt - 1); - - return; - } - - while (true) { - int cur = state; - - assert cur > 0; - - if (compareAndSet(STATE_OFFS, cur, cur - 1)) { - readLockEntryCnt.set(0); - - return; - } - } +// int cnt = readLockEntryCnt.get(); +// +// if (cnt == 0) +// throw new IllegalMonitorStateException(); +// +// // Read unlock when holding write lock is performed here. +// if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) { +// assert state > 0 || state == -1; +// +// readLockEntryCnt.set(cnt - 1); +// +// return; +// } +// +// while (true) { +// int cur = state; +// +// assert cur > 0; +// +// if (compareAndSet(STATE_OFFS, cur, cur - 1)) { +// readLockEntryCnt.set(0); +// +// return; +// } +// } } /** @@ -185,95 +187,95 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public void writeLock() { - long threadId = Thread.currentThread().getId(); - - if (threadId == writeLockOwner) { - assert state == -1; - - writeLockEntryCnt++; - - return; - } - - // Increment pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) - break; - } - - boolean interrupted = false; - - while (!compareAndSet(STATE_OFFS, 0, -1)) { - try { - Thread.sleep(10); - } - catch (InterruptedException ignored) { - interrupted = true; - } - } - - // Decrement pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - assert pendingWLocks0 > 0; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) - break; - } - - if (interrupted) - Thread.currentThread().interrupt(); - - assert writeLockOwner == -1; - - writeLockOwner = threadId; - writeLockEntryCnt = 1; +// long threadId = Thread.currentThread().getId(); +// +// if (threadId == writeLockOwner) { +// assert state == -1; +// +// writeLockEntryCnt++; +// +// return; +// } +// +// // Increment pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) +// break; +// } +// +// boolean interrupted = false; +// +// while (!compareAndSet(STATE_OFFS, 0, -1)) { +// try { +// Thread.sleep(10); +// } +// catch (InterruptedException ignored) { +// interrupted = true; +// } +// } +// +// // Decrement pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// assert pendingWLocks0 > 0; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) +// break; +// } +// +// if (interrupted) +// Thread.currentThread().interrupt(); +// +// assert writeLockOwner == -1; +// +// writeLockOwner = threadId; +// writeLockEntryCnt = 1; } /** * Acquires write lock without sleeping between unsuccessful attempts. */ public void writeLock0() { - long threadId = Thread.currentThread().getId(); - - if (threadId == writeLockOwner) { - assert state == -1; - - writeLockEntryCnt++; - - return; - } - - // Increment pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) - break; - } - - for (;;) { - if (compareAndSet(STATE_OFFS, 0, -1)) - break; - } - - // Decrement pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - assert pendingWLocks0 > 0; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) - break; - } - - assert writeLockOwner == -1; - - writeLockOwner = threadId; - writeLockEntryCnt = 1; +// long threadId = Thread.currentThread().getId(); +// +// if (threadId == writeLockOwner) { +// assert state == -1; +// +// writeLockEntryCnt++; +// +// return; +// } +// +// // Increment pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) +// break; +// } +// +// for (;;) { +// if (compareAndSet(STATE_OFFS, 0, -1)) +// break; +// } +// +// // Decrement pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// assert pendingWLocks0 > 0; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) +// break; +// } +// +// assert writeLockOwner == -1; +// +// writeLockOwner = threadId; +// writeLockEntryCnt = 1; } /** @@ -289,26 +291,28 @@ public class GridSpinReadWriteLock { * @return {@code True} if write lock has been acquired. */ public boolean tryWriteLock() { - long threadId = Thread.currentThread().getId(); - - if (threadId == writeLockOwner) { - assert state == -1; - - writeLockEntryCnt++; - - return true; - } - - if (compareAndSet(STATE_OFFS, 0, -1)) { - assert writeLockOwner == -1; - - writeLockOwner = threadId; - writeLockEntryCnt = 1; - - return true; - } - - return false; +// long threadId = Thread.currentThread().getId(); +// +// if (threadId == writeLockOwner) { +// assert state == -1; +// +// writeLockEntryCnt++; +// +// return true; +// } +// +// if (compareAndSet(STATE_OFFS, 0, -1)) { +// assert writeLockOwner == -1; +// +// writeLockOwner = threadId; +// writeLockEntryCnt = 1; +// +// return true; +// } +// +// return false; + + return true; } /** @@ -319,81 +323,83 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException { - long threadId = Thread.currentThread().getId(); - - if (threadId == writeLockOwner) { - assert state == -1; - - writeLockEntryCnt++; - - return true; - } - - try { - // Increment pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) - break; - } - - long end = U.currentTimeMillis() + unit.toMillis(timeout); - - while (true) { - if (compareAndSet(STATE_OFFS, 0, -1)) { - assert writeLockOwner == -1; - - writeLockOwner = threadId; - writeLockEntryCnt = 1; - - return true; - } - - Thread.sleep(10); - - if (end <= U.currentTimeMillis()) - return false; - } - } - finally { - // Decrement pending write locks. - while (true) { - int pendingWLocks0 = pendingWLocks; - - assert pendingWLocks0 > 0; - - if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) - break; - } - } +// long threadId = Thread.currentThread().getId(); +// +// if (threadId == writeLockOwner) { +// assert state == -1; +// +// writeLockEntryCnt++; +// +// return true; +// } +// +// try { +// // Increment pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) +// break; +// } +// +// long end = U.currentTimeMillis() + unit.toMillis(timeout); +// +// while (true) { +// if (compareAndSet(STATE_OFFS, 0, -1)) { +// assert writeLockOwner == -1; +// +// writeLockOwner = threadId; +// writeLockEntryCnt = 1; +// +// return true; +// } +// +// Thread.sleep(10); +// +// if (end <= U.currentTimeMillis()) +// return false; +// } +// } +// finally { +// // Decrement pending write locks. +// while (true) { +// int pendingWLocks0 = pendingWLocks; +// +// assert pendingWLocks0 > 0; +// +// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) +// break; +// } +// } + + return true; } /** * Releases write lock. */ public void writeUnlock() { - long threadId = Thread.currentThread().getId(); - - if (threadId != writeLockOwner) - throw new IllegalMonitorStateException(); - - if (writeLockEntryCnt > 1) { - writeLockEntryCnt--; - - return; - } - - writeLockEntryCnt = 0; - writeLockOwner = -1; - - // Current thread holds write and read locks and is releasing - // write lock now. - int update = readLockEntryCnt.get() > 0 ? 1 : 0; - - boolean b = compareAndSet(STATE_OFFS, -1, update); - - assert b; +// long threadId = Thread.currentThread().getId(); +// +// if (threadId != writeLockOwner) +// throw new IllegalMonitorStateException(); +// +// if (writeLockEntryCnt > 1) { +// writeLockEntryCnt--; +// +// return; +// } +// +// writeLockEntryCnt = 0; +// writeLockOwner = -1; +// +// // Current thread holds write and read locks and is releasing +// // write lock now. +// int update = readLockEntryCnt.get() > 0 ? 1 : 0; +// +// boolean b = compareAndSet(STATE_OFFS, -1, update); +// +// assert b; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java new file mode 100644 index 0000000..28b07f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class UUIDCollectionMessage implements Message { + /** */ + @GridDirectCollection(UUID.class) + private Collection<UUID> uuids; + + /** + * Empty constructor required for direct marshalling. + */ + public UUIDCollectionMessage() { + // No-op. + } + + /** + * @param uuids UUIDs to wrap. + */ + public UUIDCollectionMessage(Collection<UUID> uuids) { + this.uuids = uuids; + } + + /** + * @return The collection of UUIDs that was wrapped. + */ + public Collection<UUID> uuids() { + return uuids; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("uuids", uuids, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + uuids = reader.readCollection("uuids", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(UUIDCollectionMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 115; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8a0f445/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index c1d91a8..0d2cbaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -1866,6 +1866,8 @@ public class GridFunc { assert m != null; + final boolean hasPred = p == null || p.length == 0; + return new GridSerializableMap<K, V1>() { /** Entry predicate. */ private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() { @@ -1911,7 +1913,7 @@ public class GridFunc { } @Override public int size() { - return F.size(m.keySet(), p); + return hasPred ? F.size(m.keySet(), p) : m.size(); } @SuppressWarnings({"unchecked"}) @@ -1925,13 +1927,13 @@ public class GridFunc { } @Override public boolean isEmpty() { - return !iterator().hasNext(); + return hasPred ? !iterator().hasNext() : m.isEmpty(); } }; } @Override public boolean isEmpty() { - return entrySet().isEmpty(); + return hasPred ? entrySet().isEmpty() : m.isEmpty(); } @SuppressWarnings({"unchecked"})
