Repository: ignite Updated Branches: refs/heads/gg-12133 [created] ecd9d1c1e
gg-12133 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecd9d1c1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecd9d1c1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecd9d1c1 Branch: refs/heads/gg-12133 Commit: ecd9d1c1ea512add29a1d27df9f6c608e8b9d7c9 Parents: b1736c0 Author: sboikov <[email protected]> Authored: Mon Jun 26 12:39:30 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jun 26 12:39:30 2017 +0300 ---------------------------------------------------------------------- .../checkpoint/GridCheckpointManager.java | 2 +- .../managers/communication/GridIoManager.java | 8 +- .../communication/GridMessageListener.java | 3 +- .../deployment/GridDeploymentCommunication.java | 4 +- .../eventstorage/GridEventStorageManager.java | 4 +- .../processors/cache/GridCacheIoManager.java | 37 ++-- .../cache/transactions/IgniteTxManager.java | 2 +- .../clock/GridClockSyncProcessor.java | 2 +- .../continuous/GridContinuousProcessor.java | 4 +- .../datastreamer/DataStreamProcessor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 2 +- .../processors/igfs/IgfsDataManager.java | 2 +- .../igfs/IgfsFragmentizerManager.java | 4 +- .../processors/job/GridJobProcessor.java | 8 +- .../handlers/task/GridTaskCommandHandler.java | 4 +- .../processors/task/GridTaskProcessor.java | 6 +- .../jobstealing/JobStealingCollisionSpi.java | 2 +- .../internal/TestRecordingCommunicationSpi.java | 29 +++ ...idCommunicationManagerListenersSelfTest.java | 2 +- .../GridCommunicationSendMessageSelfTest.java | 2 +- .../cache/GridCachePartitionedGetSelfTest.java | 2 +- .../IgniteBinaryMetadataUpdateFromInvoke.java | 187 +++++++++++++++++++ .../communication/GridIoManagerBenchmark.java | 4 +- .../communication/GridIoManagerBenchmark0.java | 12 +- .../communication/GridCacheMessageSelfTest.java | 2 +- .../testframework/GridSpiTestContext.java | 4 +- .../IgniteCacheRestartTestSuite2.java | 2 + .../query/h2/opt/GridH2IndexBase.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- 30 files changed, 288 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 9124caf..40c2c63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -449,7 +449,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @param msg Received message. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridCheckpointRequest req = (GridCheckpointRequest)msg; if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3df29cf..fe89d37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1079,7 +1079,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa CUR_PLC.set(plc); try { - lsnr.onMessage(nodeId, msg); + lsnr.onMessage(nodeId, msg, plc); } finally { if (change) @@ -1905,14 +1905,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param nodeId Node ID. * @param msg Message. */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridMessageListener[] arr0 = arr; if (arr0 == null) return; for (GridMessageListener l : arr0) - l.onMessage(nodeId, msg); + l.onMessage(nodeId, msg, plc); } /** @@ -2012,7 +2012,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** {@inheritDoc} */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", "OverlyStrongTypeCast"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridIoUserMessage)) { U.error(log, "Received unknown message (potentially fatal problem): " + msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java index 3993591..c7de57c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java @@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener { * @param nodeId ID of node that sent the message. Note that may have already * left topology by the time this message is received. * @param msg Message received. + * @param plc Message policy (pool). */ - public void onMessage(UUID nodeId, Object msg); + public void onMessage(UUID nodeId, Object msg, byte plc); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index a571ae4..661bf68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -82,7 +82,7 @@ class GridDeploymentCommunication { this.log = log.getLogger(getClass()); peerLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { processDeploymentRequest(nodeId, msg); } }; @@ -416,7 +416,7 @@ class GridDeploymentCommunication { }; GridMessageListener resLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 607bb96..b77ec27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -877,7 +877,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> GridMessageListener resLsnr = new GridMessageListener() { @SuppressWarnings("deprecation") - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1054,7 +1054,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> */ private class RequestListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 924ce79..39087b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -33,6 +33,7 @@ import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -127,7 +128,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { - @Override public void onMessage(final UUID nodeId, final Object msg) { + @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) { if (log.isDebugEnabled()) log.debug("Received unordered cache communication message [nodeId=" + nodeId + ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']'); @@ -206,6 +207,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug(msg0.toString()); } + if (plc == GridIoPolicy.UTILITY_CACHE_POOL) + rmtAffVer = new AffinityTopologyVersion(cctx.localNode().order()); + fut = cctx.exchange().affinityReadyFuture(rmtAffVer); } } @@ -213,22 +217,27 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (fut != null && !fut.isDone()) { fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - IgniteLogger log = cacheMsg.messageLogger(cctx); + try { + cctx.kernalContext().pools().poolForPolicy(plc).execute(new Runnable() { + @Override public void run() { + IgniteLogger log = cacheMsg.messageLogger(cctx); - if (log.isDebugEnabled()) { - StringBuilder msg0 = new StringBuilder("Process cache message after wait for " + - "affinity topology version ["); + if (log.isDebugEnabled()) { + StringBuilder msg0 = new StringBuilder("Process cache message after wait for " + + "affinity topology version ["); - appendMessageInfo(cacheMsg, nodeId, msg0).append(']'); + appendMessageInfo(cacheMsg, nodeId, msg0).append(']'); - log.debug(msg0.toString()); - } + log.debug(msg0.toString()); + } - handleMessage(nodeId, cacheMsg); - } - }); + handleMessage(nodeId, cacheMsg); + } + }); + } + catch (IgniteCheckedException e) { + U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e); + } } }); @@ -1336,7 +1345,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "unchecked"}) - @Override public void onMessage(final UUID nodeId, Object msg) { + @Override public void onMessage(final UUID nodeId, Object msg, byte plc) { if (log.isDebugEnabled()) log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 2c02f96..2f21614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2472,7 +2472,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private class DeadlockDetectionListener implements GridMessageListener { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridCacheMessage cacheMsg = (GridCacheMessage)msg; unmarshall(nodeId, cacheMsg); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 0764316..3586956 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -96,7 +96,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { srv.start(ctx); ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof GridClockDeltaSnapshotMessage; GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 9fd9b6d..f0429bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -288,7 +288,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { + @Override public void onMessage(UUID nodeId, Object obj, byte plc) { GridContinuousMessage msg = (GridContinuousMessage)obj; if (msg.data() == null && msg.dataBytes() != null) { @@ -721,7 +721,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private void registerMessageListener(GridContinuousHandler hnd) { if (hnd.orderedTopic() != null) { ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { + @Override public void onMessage(UUID nodeId, Object obj, byte plc) { GridContinuousMessage msg = (GridContinuousMessage)obj; // Only notification can be ordered. http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index fee4dd6..6f35a52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -80,7 +80,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (!ctx.clientNode()) { ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof DataStreamerRequest; processRequest(nodeId, (DataStreamerRequest)msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index bb9ffdd..515314e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -321,7 +321,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId())); ctx.io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof DataStreamerResponse; DataStreamerResponse res = (DataStreamerResponse)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index e534800..59e1b72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -153,7 +153,7 @@ public class IgfsDataManager extends IgfsManager { topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsBlocksMessage) processBlocksMessage(nodeId, (IgfsBlocksMessage)msg); else if (msg instanceof IgfsAckMessage) http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 2e82f33..f76b877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsFragmentizerResponse) { IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg; @@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsFragmentizerRequest || msg instanceof IgfsSyncMessage) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index ea9cbd7..eb3300c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -444,7 +444,7 @@ public class GridJobProcessor extends GridProcessorAdapter { final Condition cond = lock.newCondition(); GridMessageListener msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { String err = null; GridJobSiblingsResponse res = null; @@ -1842,7 +1842,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobSessionListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1858,7 +1858,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobCancelListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1876,7 +1876,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobExecutionListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java index 947435c..88f8d4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { super(ctx); ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridTaskResultRequest)) { U.warn(log, "Received unexpected message instead of task result request: " + msg); @@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { final Condition cond = lock.newCondition(); GridMessageListener msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { String err = null; GridTaskResultResponse res = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 9356864..7680e22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -1223,7 +1223,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof GridJobExecuteResponse) processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg); else if (jobResOnly) @@ -1267,7 +1267,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { */ private class JobSiblingsMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridJobSiblingsRequest)) { U.warn(log, "Received unexpected message instead of siblings request: " + msg); @@ -1339,7 +1339,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { */ private class TaskCancelMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg != null; if (!(msg instanceof GridTaskCancelRequest)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index f778bfc..fca2498 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -547,7 +547,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi spiCtx.addMessageListener( msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { MessageInfo info = rcvMsgMap.get(nodeId); if (info == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 2aed459..17ca1a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { blockedMsgs.add(new T2<>(node, ioMsg)); + this.notifyAll(); + return; } } @@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param cls Message class. + * @param nodeName Node name. + * @throws InterruptedException If interrupted. + */ + public void waitForBlocked(Class<?> cls, String nodeName) throws InterruptedException { + synchronized (this) { + while (!hasMessage(cls, nodeName)) + wait(); + } + } + + /** + * @param cls Message class. + * @param nodeName Node name. + * @return {@code True} if has blocked message. + */ + private boolean hasMessage(Class<?> cls, String nodeName) { + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + if (msg.get2().message().getClass() == cls && + nodeName.equals(msg.get1().attribute(ATTR_GRID_NAME))) + return true; + } + + return false; + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java index 7613543..9289f86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java @@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index 8503b48..08ee347 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -121,7 +121,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT); mgr1.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { latch.countDown(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java index 2cb4a00..47d6092 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java @@ -226,7 +226,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest { ((IgniteKernal)g).context().io().addMessageListener( TOPIC_CACHE, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']'); if (msg instanceof GridNearSingleGetRequest) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java new file mode 100644 index 0000000..f919ca7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteBinaryMetadataUpdateFromInvoke extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setMarshaller(null); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration("cache"); + + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMetadataUpdateFromAtomicInvoke() throws Exception { + for (int i = 0; i < 2; i++) { + client = false; + + log.info("Iteration: " + i); + + final int SRVS = 4; + + startGrids(SRVS); + + awaitPartitionMapExchange(); + + final CyclicBarrier b = new CyclicBarrier(3); + + final AtomicBoolean stop = new AtomicBoolean(); + + final int META_PRIMARY_NODE = 0; + final int META_UPDATE_FROM_NODE = 1; + + BinaryMarshaller marsh = (BinaryMarshaller)ignite(META_PRIMARY_NODE).configuration().getMarshaller(); + + marsh.getContext().registerClass( + marsh.binaryMarshaller().context().typeId(TestEnum1.class.getName()), TestEnum1.class); + + TestRecordingCommunicationSpi testSpi = + (TestRecordingCommunicationSpi)ignite(META_UPDATE_FROM_NODE).configuration().getCommunicationSpi(); + + testSpi.blockMessages(GridNearTxPrepareRequest.class, getTestGridName(META_PRIMARY_NODE)); + + IgniteInternalFuture<?> sFut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteCache<Object, Object> cache = ignite(META_UPDATE_FROM_NODE).cache("cache"); + + List<Integer> keys = primaryKeys(cache, 1); + + b.await(); + + cache.invoke(keys.get(0), new TestEntryProcessor()); + + return null; + } + }, "async-node-0"); + + IgniteInternalFuture<?> sFut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteCache<Object, Object> cache = ignite(META_PRIMARY_NODE).cache("cache"); + + List<Integer> keys = primaryKeys(cache, 1); + + b.await(); + + Thread.sleep(2000); + + cache.invoke(keys.get(0), new TestEntryProcessor()); + + return null; + } + }, "async-node-1"); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + client = true; + + b.await(); + + for (int i = 0; i < 1; i++) + startGrid(SRVS + i); + + stop.set(true); + + return null; + } + }); + + testSpi.waitForBlocked(GridNearTxPrepareRequest.class, getTestGridName(META_PRIMARY_NODE)); + + U.sleep(5000); + + testSpi.stopBlock(); + + sFut1.get(); + sFut2.get(); + fut2.get(); + + stopAllGrids(); + } + } + + /** + * + */ + static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> { + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + e.setValue(TestEnum1.ENUM); + + return null; + } + } + + /** + * + */ + enum TestEnum1 { + /** */ + ENUM + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java index 723495c..293e578 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java @@ -240,7 +240,7 @@ public class GridIoManagerBenchmark { GridMessageListener lsnr = new GridMessageListener() { private ClusterNode node; - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (node == null) node = g.context().discovery().node(nodeId); @@ -336,7 +336,7 @@ public class GridIoManagerBenchmark { */ private static class SenderMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); if (testLatency) http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java index f2c6255..83efd7d 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java @@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); sem.release(); @@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { map.get(((GridTestMessage)msg).id()).countDown(); } }); @@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); sem.release(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index 9c97542..2d04db2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT); mgr1.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { latch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 1c8acbc..0c04039 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -328,7 +328,7 @@ public class GridSpiTestContext implements IgniteSpiContext { @SuppressWarnings("deprecation") public void triggerMessage(ClusterNode node, Object msg) { for (GridMessageListener lsnr : msgLsnrs) - lsnr.onMessage(node.id(), msg); + lsnr.onMessage(node.id(), msg, (byte)0); } /** {@inheritDoc} */ @@ -667,7 +667,7 @@ public class GridSpiTestContext implements IgniteSpiContext { @SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", "OverlyStrongTypeCast"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridIoUserMessage ioMsg = (GridIoUserMessage)msg; ClusterNode node = locNode; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index 0513786..fb38c55 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.GridCachePutAllFailoverSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteBinaryMetadataUpdateFromInvoke; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailoverSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest; @@ -45,6 +46,7 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite { suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); + suite.addTestSuite(IgniteBinaryMetadataUpdateFromInvoke.class); suite.addTestSuite(IgniteCacheGetRestartTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index c29239f..22b94c7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -138,7 +138,7 @@ public abstract class GridH2IndexBase extends BaseIndex { msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName()); msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridSpinBusyLock l = desc.indexing().busyLock(); if (!l.enterBusy()) http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac1a6a6..0605287 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -167,7 +167,7 @@ public class GridMapQueryExecutor { }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3f886ee..71c0e12 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -190,7 +190,7 @@ public class GridReduceQueryExecutor { log = ctx.log(GridReduceQueryExecutor.class); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return;
