Repository: ignite Updated Branches: refs/heads/gridgain-7.5.11-vk [created] 433fd031a
Support optional IO policy resolver in DataStreamer. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08f59815 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08f59815 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08f59815 Branch: refs/heads/gridgain-7.5.11-vk Commit: 08f5981509711e65c0d4c6fc1209068f8958eb06 Parents: 53a8729 Author: vozerov-gridgain <[email protected]> Authored: Mon Mar 28 12:24:16 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Mar 28 14:51:08 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 43 +++++++++++++++++--- .../processors/cache/GridCacheAdapter.java | 2 +- .../datastreamer/DataStreamProcessor.java | 8 +++- .../datastreamer/DataStreamerImpl.java | 37 ++++++++++++++++- 4 files changed, 81 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3a615e6..0438b64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -113,6 +113,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Direct protocol version. */ public static final byte DIRECT_PROTO_VER = 2; + /** Current IO policy. */ + private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>(); + /** Listeners by topic. */ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>(); @@ -742,7 +745,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert obj != null; - lsnr.onMessage(nodeId, obj); + invokeListener(msg.policy(), lsnr, nodeId, obj); } finally { threadProcessingMessage(false); @@ -819,7 +822,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert obj != null; - lsnr.onMessage(nodeId, obj); + invokeListener(msg.policy(), lsnr, nodeId, obj); } /** @@ -1025,6 +1028,38 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * Invoke message listener. + * + * @param plc Policy. + * @param lsnr Listener. + * @param nodeId Node ID. + * @param msg Message. + */ + private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) { + Byte oldPlc = CUR_PLC.get(); + + boolean change = F.eq(oldPlc, plc); + + if (change) + CUR_PLC.set(plc); + + try { + lsnr.onMessage(nodeId, msg); + } + finally { + if (change) + CUR_PLC.set(oldPlc); + } + } + + /** + * @return Current IO policy + */ + @Nullable public static Byte currentPolicy() { + return CUR_PLC.get(); + } + + /** * @param node Destination node. * @param topic Topic to send the message to. * @param topicOrd GridTopic enumeration ordinal. @@ -2246,9 +2281,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) { try { - lsnr.onMessage( - nodeId, - t.get1().message()); + invokeListener(plc, lsnr, nodeId, t.get1().message()); } finally { if (t.get3() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 54046a9..b8fcfb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -5996,7 +5996,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.toCacheObject(val), ttl, 0, - ver); + ver.conflictVersion()); e.prepareDirectMarshal(ctx.cacheObjectContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index d899c67..c7c1f5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -339,7 +340,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); try { - ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); + Byte plc = GridIoManager.currentPolicy(); + + if (plc == null) + plc = PUBLIC_POOL; + + ctx.io().send(nodeId, resTopic, res, plc); } catch (IgniteCheckedException e) { if (ctx.discovery().alive(nodeId)) http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 7564376..4599060 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -91,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; @@ -109,6 +111,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Default policy reoslver. */ + private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver(); + /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -118,6 +123,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private byte[] updaterBytes; + /** IO policy resovler for data load request. */ + private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR; + /** Max remap count before issuing an error. */ private static final int DFLT_MAX_REMAP_CNT = 32; @@ -602,6 +610,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param ioPlcRslvr IO policy resolver. + */ + public void ioPolicyResolver(IgniteClosure<ClusterNode, Byte> ioPlcRslvr) { + this.ioPlcRslvr = ioPlcRslvr; + } + + /** * @param entries Entries. * @param resFut Result future. * @param activeKeys Active keys. @@ -1257,7 +1272,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; - if (isLocNode) { + Byte plc = ioPlcRslvr.apply(node); + + if (plc == null) + plc = PUBLIC_POOL; + + if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) { fut = ctx.closure().callLocalSafe( new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false); @@ -1355,7 +1375,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed topVer); try { - ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); + ctx.io().send(node, TOPIC_DATASTREAM, req, plc); if (log.isDebugEnabled()) log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); @@ -1620,4 +1640,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } } } + + /** + * Default IO policy resolver. + */ + private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Byte apply(ClusterNode gridNode) { + return PUBLIC_POOL; + } + } }
