Repository: ignite
Updated Branches:
  refs/heads/gridgain-7.5.11-vk [created] 433fd031a


Support optional IO policy resolver in DataStreamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08f59815
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08f59815
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08f59815

Branch: refs/heads/gridgain-7.5.11-vk
Commit: 08f5981509711e65c0d4c6fc1209068f8958eb06
Parents: 53a8729
Author: vozerov-gridgain <[email protected]>
Authored: Mon Mar 28 12:24:16 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Mon Mar 28 14:51:08 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 43 +++++++++++++++++---
 .../processors/cache/GridCacheAdapter.java      |  2 +-
 .../datastreamer/DataStreamProcessor.java       |  8 +++-
 .../datastreamer/DataStreamerImpl.java          | 37 ++++++++++++++++-
 4 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3a615e6..0438b64 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -113,6 +113,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /** Direct protocol version. */
     public static final byte DIRECT_PROTO_VER = 2;
 
+    /** Current IO policy. */
+    private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
+
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new 
ConcurrentHashMap8<>();
 
@@ -742,7 +745,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
                     assert obj != null;
 
-                    lsnr.onMessage(nodeId, obj);
+                    invokeListener(msg.policy(), lsnr, nodeId, obj);
                 }
                 finally {
                     threadProcessingMessage(false);
@@ -819,7 +822,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         assert obj != null;
 
-        lsnr.onMessage(nodeId, obj);
+        invokeListener(msg.policy(), lsnr, nodeId, obj);
     }
 
     /**
@@ -1025,6 +1028,38 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * Invoke message listener.
+     *
+     * @param plc Policy.
+     * @param lsnr Listener.
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void invokeListener(Byte plc, GridMessageListener lsnr, UUID 
nodeId, Object msg) {
+        Byte oldPlc = CUR_PLC.get();
+
+        boolean change = F.eq(oldPlc, plc);
+
+        if (change)
+            CUR_PLC.set(plc);
+
+        try {
+            lsnr.onMessage(nodeId, msg);
+        }
+        finally {
+            if (change)
+                CUR_PLC.set(oldPlc);
+        }
+    }
+
+    /**
+     * @return Current IO policy
+     */
+    @Nullable public static Byte currentPolicy() {
+        return CUR_PLC.get();
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param topicOrd GridTopic enumeration ordinal.
@@ -2246,9 +2281,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
             for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = 
msgs.poll(); t != null; t = msgs.poll()) {
                 try {
-                    lsnr.onMessage(
-                        nodeId,
-                        t.get1().message());
+                    invokeListener(plc, lsnr, nodeId, t.get1().message());
                 }
                 finally {
                     if (t.get3() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 54046a9..b8fcfb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5996,7 +5996,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 ctx.toCacheObject(val),
                 ttl,
                 0,
-                ver);
+                ver.conflictVersion());
 
             e.prepareDirectMarshal(ctx.cacheObjectContext());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index d899c67..c7c1f5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -339,7 +340,12 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);
 
         try {
-            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+            Byte plc = GridIoManager.currentPolicy();
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
+
+            ctx.io().send(nodeId, resTopic, res, plc);
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f59815/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 7564376..4599060 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -91,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
@@ -109,6 +111,9 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
+    /** Default policy reoslver. */
+    private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new 
DefaultIoPolicyResolver();
+
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new 
IsolatedUpdater();
 
@@ -118,6 +123,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** */
     private byte[] updaterBytes;
 
+    /** IO policy resovler for data load request. */
+    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+
     /** Max remap count before issuing an error. */
     private static final int DFLT_MAX_REMAP_CNT = 32;
 
@@ -602,6 +610,13 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * @param ioPlcRslvr IO policy resolver.
+     */
+    public void ioPolicyResolver(IgniteClosure<ClusterNode, Byte> ioPlcRslvr) {
+        this.ioPlcRslvr = ioPlcRslvr;
+    }
+
+    /**
      * @param entries Entries.
      * @param resFut Result future.
      * @param activeKeys Active keys.
@@ -1257,7 +1272,12 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             IgniteInternalFuture<Object> fut;
 
-            if (isLocNode) {
+            Byte plc = ioPlcRslvr.apply(node);
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
+
+            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
                 fut = ctx.closure().callLocalSafe(
                     new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, keepBinary, rcvr), false);
 
@@ -1355,7 +1375,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     topVer);
 
                 try {
-                    ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+                    ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() 
+ ", req=" + req + ']');
@@ -1620,4 +1640,17 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             }
         }
     }
+
+    /**
+     * Default IO policy resolver.
+     */
+    private static class DefaultIoPolicyResolver implements 
IgniteClosure<ClusterNode, Byte> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public Byte apply(ClusterNode gridNode) {
+            return PUBLIC_POOL;
+        }
+    }
 }

Reply via email to