IGNITE-2745: Avoid CHM lookup for listeners on default GridTopic's.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c4cfb68 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c4cfb68 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c4cfb68 Branch: refs/heads/ignite-1786 Commit: 1c4cfb6867f0b8687b6def4087221822c52a0f7b Parents: 7c5db21 Author: vozerov-gridgain <[email protected]> Authored: Fri Mar 4 12:19:53 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Mar 4 12:19:53 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 183 +++++++++++++++++-- 1 file changed, 172 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1c4cfb68/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 36be9ec..232ec2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -116,6 +117,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Listeners by topic. */ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>(); + /** System listeners. */ + private volatile GridMessageListener[] sysLsnrs; + + /** Mutex for system listeners. */ + private final Object sysLsnrsMux = new Object(); + /** Disconnect listeners. */ private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>(); @@ -201,6 +208,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa discoDelay = ctx.config().getDiscoveryStartupDelay(); marsh = ctx.config().getMarshaller(); + + synchronized (sysLsnrsMux) { + sysLsnrs = new GridMessageListener[GridTopic.values().length]; + } } /** @@ -733,7 +744,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { threadProcessingMessage(true); - GridMessageListener lsnr = lsnrMap.get(msg.topic()); + GridMessageListener lsnr = listenerGet0(msg.topic()); if (lsnr == null) return; @@ -810,7 +821,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ @SuppressWarnings("deprecation") private void processRegularMessage0(GridIoMessage msg, UUID nodeId) { - GridMessageListener lsnr = lsnrMap.get(msg.topic()); + GridMessageListener lsnr = listenerGet0(msg.topic()); if (lsnr == null) return; @@ -823,6 +834,156 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * Get listener. + * + * @param topic Topic. + * @return Listener. + */ + @Nullable private GridMessageListener listenerGet0(Object topic) { + if (topic instanceof GridTopic) + return sysLsnrs[systemListenerIndex(topic)]; + else + return lsnrMap.get(topic); + } + + /** + * Put listener if it is absent. + * + * @param topic Topic. + * @param lsnr Listener. + * @return Old listener (if any). + */ + @Nullable private GridMessageListener listenerPutIfAbsent0(Object topic, GridMessageListener lsnr) { + if (topic instanceof GridTopic) { + synchronized (sysLsnrsMux) { + int idx = systemListenerIndex(topic); + + GridMessageListener old = sysLsnrs[idx]; + + if (old == null) + changeSystemListener(idx, lsnr); + + return old; + } + } + else + return lsnrMap.putIfAbsent(topic, lsnr); + } + + /** + * Remove listener. + * + * @param topic Topic. + * @return Removed listener (if any). + */ + @Nullable private GridMessageListener listenerRemove0(Object topic) { + if (topic instanceof GridTopic) { + synchronized (sysLsnrsMux) { + int idx = systemListenerIndex(topic); + + GridMessageListener old = sysLsnrs[idx]; + + if (old != null) + changeSystemListener(idx, null); + + return old; + } + } + else + return lsnrMap.remove(topic); + } + + /** + * Remove listener if it matches expected value. + * + * @param topic Topic. + * @param expected Listener. + * @return Result. + */ + private boolean listenerRemove0(Object topic, GridMessageListener expected) { + if (topic instanceof GridTopic) { + synchronized (sysLsnrsMux) { + return systemListenerChange(topic, expected, null); + } + } + else + return lsnrMap.remove(topic, expected); + } + + /** + * Replace listener. + * + * @param topic Topic. + * @param expected Old value. + * @param newVal New value. + * @return Result. + */ + private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) { + if (topic instanceof GridTopic) { + synchronized (sysLsnrsMux) { + return systemListenerChange(topic, expected, newVal); + } + } + else + return lsnrMap.replace(topic, expected, newVal); + } + + /** + * Change system listener. + * + * @param topic Topic. + * @param expected Expected value. + * @param newVal New value. + * @return Result. + */ + private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) { + assert Thread.holdsLock(sysLsnrsMux); + assert topic instanceof GridTopic; + + int idx = systemListenerIndex(topic); + + GridMessageListener old = sysLsnrs[idx]; + + if (old != null && old.equals(expected)) { + changeSystemListener(idx, newVal); + + return true; + } + + return false; + } + + /** + * Change systme listener at the given index. + * + * @param idx Index. + * @param lsnr Listener. + */ + private void changeSystemListener(int idx, @Nullable GridMessageListener lsnr) { + assert Thread.holdsLock(sysLsnrsMux); + + GridMessageListener[] res = new GridMessageListener[sysLsnrs.length]; + + System.arraycopy(sysLsnrs, 0, res, 0, sysLsnrs.length); + + res[idx] = lsnr; + + sysLsnrs = res; + } + + /** + * Get index of a system listener. + * + * @param topic Topic. + * @return Index. + */ + private int systemListenerIndex(Object topic) { + assert topic instanceof GridTopic; + + return ((GridTopic)topic).ordinal(); + } + + /** * @param nodeId Node ID. * @param msg Ordered message. * @param plc Execution policy. @@ -928,7 +1089,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (isNew && set.endTime() != Long.MAX_VALUE) ctx.timeout().addTimeoutObject(set); - final GridMessageListener lsnr = lsnrMap.get(msg.topic()); + final GridMessageListener lsnr = listenerGet0(msg.topic()); if (lsnr == null) { if (closedTopics.contains(msg.topic())) { @@ -1537,7 +1698,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa GridMessageListener lsnrs; for (;;) { - lsnrs = lsnrMap.putIfAbsent(topic, lsnr); + lsnrs = listenerPutIfAbsent0(topic, lsnr); if (lsnrs == null) { lsnrs = lsnr; @@ -1550,7 +1711,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array. GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr); - if (lsnrMap.replace(topic, lsnrs, arrLsnr)) { + if (listenerReplace0(topic, lsnrs, arrLsnr)) { lsnrs = arrLsnr; break; @@ -1561,7 +1722,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa break; // Add operation failed because array is already empty and is about to be removed, helping and retrying. - lsnrMap.remove(topic, lsnrs); + listenerRemove0(topic, lsnrs); } } @@ -1639,7 +1800,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (lsnr == null) { closedTopics.add(topic); - lsnr = lsnrMap.remove(topic); + lsnr = listenerRemove0(topic); rmv = lsnr != null; @@ -1650,7 +1811,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } else { for (;;) { - GridMessageListener lsnrs = lsnrMap.get(topic); + GridMessageListener lsnrs = listenerGet0(topic); // If removing listener before subscription happened. if (lsnrs == null) { @@ -1670,7 +1831,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!(lsnrs instanceof ArrayListener)) { if (lsnrs.equals(lsnr)) { - if (!lsnrMap.remove(topic, lsnrs)) + if (!listenerRemove0(topic, lsnrs)) continue; // Retry because it can be packed to array listener. empty = true; @@ -1688,7 +1849,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa rmv = false; if (empty) - lsnrMap.remove(topic, lsnrs); + listenerRemove0(topic, lsnrs); } // If removing last subscribed listener. @@ -2132,7 +2293,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** {@inheritDoc} */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public void onTimeout() { - GridMessageListener lsnr = lsnrMap.get(topic); + GridMessageListener lsnr = listenerGet0(topic); if (lsnr != null) { long delta = 0;
