Merge master into ignite-843
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae09fa9f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae09fa9f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae09fa9f Branch: refs/heads/ignite-843 Commit: ae09fa9f26b9171636f22cdc11c61a77ad3cd4f4 Parents: 49514e8 1223525 Author: Andrey <[email protected]> Authored: Mon Oct 12 16:28:55 2015 +0700 Committer: Andrey <[email protected]> Committed: Mon Oct 12 16:29:06 2015 +0700 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 479 ++++++------ .../processors/cache/GridCacheProcessor.java | 157 ++-- .../cache/query/GridCacheQueryManager.java | 719 +++++++++---------- .../processors/rest/GridRestProcessor.java | 176 +++-- .../handlers/cache/GridCacheCommandHandler.java | 313 ++++---- .../handlers/query/QueryCommandHandler.java | 156 ++-- .../top/GridTopologyCommandHandler.java | 18 +- .../rest/request/RestQueryRequest.java | 26 +- .../http/jetty/GridJettyRestHandler.java | 151 ++-- 9 files changed, 1055 insertions(+), 1140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 6aba211,9e54f6f..b74fbdb --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@@ -134,33 -134,33 +134,24 @@@ import static org.apache.ignite.plugin. * Discovery SPI manager. */ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ++ /** Discovery cached history size. */ ++ protected static final int DISCOVERY_HISTORY_SIZE = 100; /** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */ private static final String NULL_CACHE_NAME = UUID.randomUUID().toString(); -- /** Metrics update frequency. */ private static final long METRICS_UPDATE_FREQ = 3000; -- /** */ private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean(); -- /** */ private static final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); -- /** */ private static final RuntimeMXBean rt = ManagementFactory.getRuntimeMXBean(); -- /** */ private static final ThreadMXBean threads = ManagementFactory.getThreadMXBean(); -- /** */ private static final Collection<GarbageCollectorMXBean> gc = ManagementFactory.getGarbageCollectorMXBeans(); -- /** */ private static final String PREFIX = "Topology snapshot"; -- -- /** Discovery cached history size. */ -- protected static final int DISCOVERY_HISTORY_SIZE = 100; -- /** Predicate filtering out daemon nodes. */ private static final IgnitePredicate<ClusterNode> daemonFilter = new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { @@@ -185,88 -185,88 +176,83 @@@ /** Discovery event worker. */ private final DiscoveryWorker discoWrk = new DiscoveryWorker(); -- ++ /** Last logged topology. */ ++ private final AtomicLong lastLoggedTop = new AtomicLong(); ++ /** Last segment check result. */ ++ private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true); ++ /** Topology cache history. */ ++ private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist = ++ new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE); ++ /** Topology version. */ ++ private final AtomicReference<Snapshot> topSnap = ++ new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null)); ++ /** */ ++ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); ++ /** Received custom messages history. */ ++ private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>(); ++ /** */ ++ private final CountDownLatch startLatch = new CountDownLatch(1); /** Network segment check worker. */ private SegmentCheckWorker segChkWrk; -- /** Network segment check thread. */ private IgniteThread segChkThread; -- -- /** Last logged topology. */ -- private final AtomicLong lastLoggedTop = new AtomicLong(); -- /** Local node. */ private ClusterNode locNode; -- /** Local node daemon flag. */ private boolean isLocDaemon; -- /** {@code True} if resolvers were configured and network segment check is enabled. */ private boolean hasRslvrs; -- -- /** Last segment check result. */ -- private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true); -- -- /** Topology cache history. */ -- private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist = -- new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE); -- /** Topology snapshots history. */ private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>(); -- -- /** Topology version. */ -- private final AtomicReference<Snapshot> topSnap = -- new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null)); -- /** Minor topology version. */ private int minorTopVer; -- /** Order supported flag. */ private boolean discoOrdered; -- /** Topology snapshots history supported flag. */ private boolean histSupported; -- /** Configured network segment check frequency. */ private long segChkFreq; -- /** Local node join to topology event. */ private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>(); -- /** GC CPU load. */ private volatile double gcCpuLoad; -- /** CPU load. */ private volatile double cpuLoad; -- /** Metrics. */ private final GridLocalMetrics metrics = createMetrics(); -- /** Metrics update worker. */ private GridTimeoutProcessor.CancelableTask metricsUpdateTask; -- /** Custom event listener. */ private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs = new ConcurrentHashMap8<>(); -- /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); -- /** */ -- private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); -- -- /** Received custom messages history. */ -- private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>(); -- -- /** */ -- private final CountDownLatch startLatch = new CountDownLatch(1); -- /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); } /** ++ * @param nodes Nodes. ++ * @return Total CPUs. ++ */ ++ private static int cpus(Collection<ClusterNode> nodes) { ++ Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f); ++ ++ int cpus = 0; ++ ++ for (ClusterNode n : nodes) { ++ String macs = n.attribute(ATTR_MACS); ++ ++ if (macSet.add(macs)) ++ cpus += n.metrics().getTotalCpus(); ++ } ++ ++ return cpus; ++ } ++ ++ /** * @return Memory usage of non-heap memory. */ private MemoryUsage nonHeapMemoryUsage() { @@@ -1063,25 -1063,25 +1049,6 @@@ } /** -- * @param nodes Nodes. -- * @return Total CPUs. -- */ -- private static int cpus(Collection<ClusterNode> nodes) { -- Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f); -- -- int cpus = 0; -- -- for (ClusterNode n : nodes) { -- String macs = n.attribute(ATTR_MACS); -- -- if (macSet.add(macs)) -- cpus += n.metrics().getTotalCpus(); -- } -- -- return cpus; -- } -- -- /** * Prints the latest topology info into log taking into account logging/verbosity settings. */ public void ackTopology() { @@@ -1812,105 -1812,105 +1779,291 @@@ ).start(); } - /** Worker for network segment checks. */ - private class SegmentCheckWorker extends GridWorker { ++ /** Discovery topology future. */ ++ private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener { + /** */ - private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); ++ private static final long serialVersionUID = 0L; + - /** - * - */ - private SegmentCheckWorker() { - super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log); ++ /** */ ++ private GridKernalContext ctx; + - assert hasRslvrs; - assert segChkFreq > 0; ++ /** Topology await version. */ ++ private long awaitVer; ++ ++ /** Empty constructor required by {@link Externalizable}. */ ++ private DiscoTopologyFuture() { ++ // No-op. + } + + /** - * ++ * @param ctx Context. ++ * @param awaitVer Await version. + */ - public void scheduleSegmentCheck() { - queue.add(new Object()); ++ private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) { ++ this.ctx = ctx; ++ this.awaitVer = awaitVer; + } + - /** {@inheritDoc} */ - @SuppressWarnings("StatementWithEmptyBody") - @Override protected void body() throws InterruptedException { - long lastChk = 0; ++ /** Initializes future. */ ++ private void init() { ++ ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + - while (!isCancelled()) { - Object req = queue.poll(2000, MILLISECONDS); ++ // Close potential window. ++ long topVer = ctx.discovery().topologyVersion(); + - long now = U.currentTimeMillis(); ++ if (topVer >= awaitVer) ++ onDone(topVer); ++ } + - // Check frequency if segment check has not been requested. - if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) { - if (log.isDebugEnabled()) - log.debug("Skipping segment check as it has not been requested and it is not time to check."); ++ /** {@inheritDoc} */ ++ @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) { ++ if (super.onDone(res, err)) { ++ ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + - continue; - } ++ return true; ++ } + - // We should always check segment if it has been explicitly - // requested (on any node failure or leave). - assert req != null || lastChk + segChkFreq < now; ++ return false; ++ } + - // Drain queue. - while (queue.poll() != null) { - // No-op. - } ++ /** {@inheritDoc} */ ++ @Override public void onEvent(Event evt) { ++ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + - if (lastSegChkRes.get()) { - boolean segValid = ctx.segmentation().isValidSegment(); ++ DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + - lastChk = now; ++ if (discoEvt.topologyVersion() >= awaitVer) ++ onDone(discoEvt.topologyVersion()); ++ } ++ } + - if (!segValid) { - discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(), - Collections.<ClusterNode>emptyList(), null); ++ /** ++ * ++ */ ++ private static class Snapshot { ++ /** */ ++ private final AffinityTopologyVersion topVer; + - lastSegChkRes.set(false); - } ++ /** */ ++ @GridToStringExclude ++ private final DiscoCache discoCache; + - if (log.isDebugEnabled()) - log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']'); - } - } ++ /** ++ * @param topVer Topology version. ++ * @param discoCache Disco cache. ++ */ ++ private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) { ++ this.topVer = topVer; ++ this.discoCache = discoCache; + } + + /** {@inheritDoc} */ + @Override public String toString() { - return S.toString(SegmentCheckWorker.class, this); ++ return S.toString(Snapshot.class, this); + } + } + - /** Worker for discovery events. */ - private class DiscoveryWorker extends GridWorker { - /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, - DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); ++ /** ++ * Cache predicate. ++ */ ++ private static class CachePredicate { ++ /** Cache filter. */ ++ private final IgnitePredicate<ClusterNode> cacheFilter; + - /** Node segmented event fired flag. */ - private boolean nodeSegFired; ++ /** If near cache is enabled on data nodes. */ ++ private final boolean nearEnabled; ++ ++ /** Cache mode. */ ++ private final CacheMode cacheMode; ++ ++ /** Collection of client near nodes. */ ++ private final ConcurrentHashMap<UUID, Boolean> clientNodes; + + /** - * ++ * @param cacheFilter Cache filter. ++ * @param nearEnabled Near enabled flag. ++ * @param cacheMode Cache mode. + */ - private DiscoveryWorker() { - super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log); ++ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) { ++ assert cacheFilter != null; ++ ++ this.cacheFilter = cacheFilter; ++ this.nearEnabled = nearEnabled; ++ this.cacheMode = cacheMode; ++ ++ clientNodes = new ConcurrentHashMap<>(); + } + + /** - * Method is called when any discovery event occurs. - * - * @param type Discovery event type. See {@link DiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. - * @param topSnapshot Topology snapshot. ++ * @param nodeId Near node ID to add. ++ * @param nearEnabled Near enabled flag. ++ * @return {@code True} if new node ID was added. + */ - @SuppressWarnings("RedundantTypeArguments") - private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { ++ public boolean addClientNode(UUID nodeId, boolean nearEnabled) { ++ assert nodeId != null; ++ ++ Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled); ++ ++ return old == null; ++ } ++ ++ /** ++ * @param leftNodeId Left node ID. ++ * @return {@code True} if existing node ID was removed. ++ */ ++ public boolean onNodeLeft(UUID leftNodeId) { ++ assert leftNodeId != null; ++ ++ Boolean old = clientNodes.remove(leftNodeId); ++ ++ return old != null; ++ } ++ ++ /** ++ * @param node Node to check. ++ * @return {@code True} if this node is a data node for given cache. ++ */ ++ public boolean dataNode(ClusterNode node) { ++ return !node.isDaemon() && CU.affinityNode(node, cacheFilter); ++ } ++ ++ /** ++ * @param node Node to check. ++ * @return {@code True} if cache is accessible on the given node. ++ */ ++ public boolean cacheNode(ClusterNode node) { ++ return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); ++ } ++ ++ /** ++ * @param node Node to check. ++ * @return {@code True} if near cache is present on the given nodes. ++ */ ++ public boolean nearNode(ClusterNode node) { ++ if (node.isDaemon()) ++ return false; ++ ++ if (CU.affinityNode(node, cacheFilter)) ++ return nearEnabled; ++ ++ Boolean near = clientNodes.get(node.id()); ++ ++ return near != null && near; ++ } ++ ++ /** ++ * @param node Node to check. ++ * @return {@code True} if client cache is present on the given nodes. ++ */ ++ public boolean clientNode(ClusterNode node) { ++ if (node.isDaemon()) ++ return false; ++ ++ Boolean near = clientNodes.get(node.id()); ++ ++ return near != null && !near; ++ } ++ } ++ + /** Worker for network segment checks. */ + private class SegmentCheckWorker extends GridWorker { + /** */ + private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); + + /** + * + */ + private SegmentCheckWorker() { + super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log); + + assert hasRslvrs; + assert segChkFreq > 0; + } + + /** + * + */ + public void scheduleSegmentCheck() { + queue.add(new Object()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("StatementWithEmptyBody") + @Override protected void body() throws InterruptedException { + long lastChk = 0; + + while (!isCancelled()) { + Object req = queue.poll(2000, MILLISECONDS); + + long now = U.currentTimeMillis(); + + // Check frequency if segment check has not been requested. + if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) { + if (log.isDebugEnabled()) + log.debug("Skipping segment check as it has not been requested and it is not time to check."); + + continue; + } + + // We should always check segment if it has been explicitly + // requested (on any node failure or leave). + assert req != null || lastChk + segChkFreq < now; + + // Drain queue. + while (queue.poll() != null) { + // No-op. + } + + if (lastSegChkRes.get()) { + boolean segValid = ctx.segmentation().isValidSegment(); + + lastChk = now; + + if (!segValid) { + discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(), + Collections.<ClusterNode>emptyList(), null); + + lastSegChkRes.set(false); + } + + if (log.isDebugEnabled()) + log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']'); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SegmentCheckWorker.class, this); + } + } + + /** Worker for discovery events. */ + private class DiscoveryWorker extends GridWorker { + /** Event queue. */ + private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); + + /** Node segmented event fired flag. */ + private boolean nodeSegFired; + + /** + * + */ + private DiscoveryWorker() { + super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log); + } + + /** + * Method is called when any discovery event occurs. + * + * @param type Discovery event type. See {@link DiscoveryEvent} for more details. + * @param topVer Topology version. + * @param node Remote node this event is connected with. + * @param topSnapshot Topology snapshot. + */ + @SuppressWarnings("RedundantTypeArguments") + private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { assert node != null; if (ctx.event().isRecordable(type)) { @@@ -2279,90 -2279,90 +2432,6 @@@ } } -- /** Discovery topology future. */ -- private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener { -- /** */ -- private static final long serialVersionUID = 0L; -- -- /** */ -- private GridKernalContext ctx; -- -- /** Topology await version. */ -- private long awaitVer; -- -- /** Empty constructor required by {@link Externalizable}. */ -- private DiscoTopologyFuture() { -- // No-op. -- } -- -- /** -- * @param ctx Context. -- * @param awaitVer Await version. -- */ -- private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) { -- this.ctx = ctx; -- this.awaitVer = awaitVer; -- } -- -- /** Initializes future. */ -- private void init() { -- ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); -- -- // Close potential window. -- long topVer = ctx.discovery().topologyVersion(); -- -- if (topVer >= awaitVer) -- onDone(topVer); -- } -- -- /** {@inheritDoc} */ -- @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) { -- if (super.onDone(res, err)) { -- ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); -- -- return true; -- } -- -- return false; -- } -- -- /** {@inheritDoc} */ -- @Override public void onEvent(Event evt) { -- assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; -- -- DiscoveryEvent discoEvt = (DiscoveryEvent)evt; -- -- if (discoEvt.topologyVersion() >= awaitVer) -- onDone(discoEvt.topologyVersion()); -- } -- } -- -- /** -- * -- */ -- private static class Snapshot { -- /** */ -- private final AffinityTopologyVersion topVer; -- -- /** */ -- @GridToStringExclude -- private final DiscoCache discoCache; -- -- /** -- * @param topVer Topology version. -- * @param discoCache Disco cache. -- */ -- private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) { -- this.topVer = topVer; -- this.discoCache = discoCache; -- } -- -- /** {@inheritDoc} */ -- @Override public String toString() { -- return S.toString(Snapshot.class, this); -- } -- } -- /** Cache for discovery collections. */ private class DiscoCache { /** Remote nodes. */ @@@ -2841,106 -2841,583 +2910,4 @@@ return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); } } -- - /** - /** Cache for discovery collections. */ - private class DiscoCache { - /** Remote nodes. */ - private final List<ClusterNode> rmtNodes; - - /** All nodes. */ - private final List<ClusterNode> allNodes; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> allNodesWithCaches; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> rmtNodesWithCaches; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> allCacheNodes; - - /** Remote cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> rmtCacheNodes; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> affCacheNodes; - - /** Caches where at least one node has near cache enabled. */ - @GridToStringInclude - private final Set<String> nearEnabledCaches; - - /** Nodes grouped by version. */ - private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer; - - /** Daemon nodes. */ - private final List<ClusterNode> daemonNodes; - - /** Node map. */ - private final Map<UUID, ClusterNode> nodeMap; - - /** Local node. */ - private final ClusterNode loc; - - /** Highest node order. */ - private final long maxOrder; - - /** - * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes; - - /** - * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; - - /** - * Cached alive remote nodes with caches. - */ - private final Collection<ClusterNode> aliveNodesWithCaches; - - /** - * Cached alive server remote nodes with caches. - */ - private final Collection<ClusterNode> aliveSrvNodesWithCaches; - - /** - * Cached alive remote server nodes with caches. - */ - private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; - - /** - * @param loc Local node. - * @param rmts Remote nodes. - */ - private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) { - this.loc = loc; - - rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, daemonFilter))); - - assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + - " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; - - List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1); - - if (!loc.isDaemon()) - all.add(loc); - - all.addAll(rmtNodes); - - Collections.sort(all, GridNodeOrderComparator.INSTANCE); - - allNodes = Collections.unmodifiableList(all); - - Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f); - Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); - Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); - - aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); - nodesByVer = new TreeMap<>(); - - long maxOrder0 = 0; - - Set<String> nearEnabledSet = new HashSet<>(); - - for (ClusterNode node : allNodes) { - assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; - - if (node.order() > maxOrder0) - maxOrder0 = node.order(); - - boolean hasCaches = false; - - for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { - String cacheName = entry.getKey(); - - CachePredicate filter = entry.getValue(); - - if (filter.cacheNode(node)) { - nodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - rmtNodesWithCaches.add(node); - - addToMap(cacheMap, cacheName, node); - - if (alive(node.id())) - addToMap(aliveCacheNodes, maskNull(cacheName), node); - - if (filter.dataNode(node)) - addToMap(dhtNodesMap, cacheName, node); - - if (filter.nearNode(node)) - nearEnabledSet.add(cacheName); - - if (!loc.id().equals(node.id())) { - addToMap(rmtCacheMap, cacheName, node); - - if (alive(node.id())) - addToMap(aliveRmtCacheNodes, maskNull(cacheName), node); - } - - hasCaches = true; - } - } - - if (hasCaches) { - if (alive(node.id())) { - aliveNodesWithCaches.add(node); - - if (!CU.clientNode(node)) { - aliveSrvNodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - aliveRmtSrvNodesWithCaches.add(node); - } - } - } - - IgniteProductVersion nodeVer = U.productVersion(node); - - // Create collection for this version if it does not exist. - Collection<ClusterNode> nodes = nodesByVer.get(nodeVer); - - if (nodes == null) { - nodes = new ArrayList<>(allNodes.size()); - - nodesByVer.put(nodeVer, nodes); - } - - nodes.add(node); - } - - // Need second iteration to add this node to all previous node versions. - for (ClusterNode node : allNodes) { - IgniteProductVersion nodeVer = U.productVersion(node); - - // Get all versions lower or equal node's version. - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView = - nodesByVer.headMap(nodeVer, false); - - for (Collection<ClusterNode> prevVersions : updateView.values()) - prevVersions.add(node); - } - - maxOrder = maxOrder0; - - allCacheNodes = Collections.unmodifiableMap(cacheMap); - rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap); - affCacheNodes = Collections.unmodifiableMap(dhtNodesMap); - allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); - this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); - nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet); - - daemonNodes = Collections.unmodifiableList(new ArrayList<>( - F.view(F.concat(false, loc, rmts), F0.not(daemonFilter)))); - - Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f); - - for (ClusterNode n : F.concat(false, allNodes(), daemonNodes())) - nodeMap.put(n.id(), n); - - this.nodeMap = nodeMap; - } - - /** - * Adds node to map. - * - * @param cacheMap Map to add to. - * @param cacheName Cache name. - * @param rich Node to add - */ - private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { - Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName); - - if (cacheNodes == null) { - cacheNodes = new ArrayList<>(allNodes.size()); - - cacheMap.put(cacheName, cacheNodes); - } - - cacheNodes.add(rich); - } - - /** @return Local node. */ - ClusterNode localNode() { - return loc; - } - - /** @return Remote nodes. */ - Collection<ClusterNode> remoteNodes() { - return rmtNodes; - } - - /** @return All nodes. */ - Collection<ClusterNode> allNodes() { - return allNodes; - } - - /** - * Gets collection of nodes which have version equal or greater than {@code ver}. - * - * @param ver Version to check. - * @return Collection of nodes with version equal or greater than {@code ver}. - */ - Collection<ClusterNode> elderNodes(IgniteProductVersion ver) { - Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver); - - if (entry == null) - return Collections.emptyList(); - - return entry.getValue(); - } - - /** - * @return Versions map. - */ - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() { - return nodesByVer; - } - - /** - * Gets collection of nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of nodes. - */ - Collection<ClusterNode> allNodesWithCaches(final long topVer) { - return filter(topVer, allNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, allCacheNodes.get(cacheName)); - } - - /** - * Gets all remote nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, rmtCacheNodes.get(cacheName)); - } - - /** - * Gets all remote nodes that have at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(final long topVer) { - return filter(topVer, rmtNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, affCacheNodes.get(cacheName)); - } - - /** - * Gets all alive nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveCacheNodes.get(maskNull(cacheName))); - } - - /** - * Gets all alive remote nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName))); - } - - /** - * Gets all alive remote server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtSrvNodesWithCaches); - } - - /** - * Gets all alive server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveSrvNodesWithCaches); - } - - /** - * Gets all alive remote nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveNodesWithCaches(final long topVer) { - return filter(topVer, aliveNodesWithCaches); - } - - /** - * Checks if cache with given name has at least one node with near cache enabled. - * - * @param cacheName Cache name. - * @return {@code True} if cache with given name has at least one node with near cache enabled. - */ - boolean hasNearCache(@Nullable String cacheName) { - return nearEnabledCaches.contains(cacheName); - } - - /** - * Removes left node from cached alives lists. - * - * @param leftNode Left node. - */ - void updateAlives(ClusterNode leftNode) { - if (leftNode.order() > maxOrder) - return; - - filterNodeMap(aliveCacheNodes, leftNode); - - filterNodeMap(aliveRmtCacheNodes, leftNode); - - aliveNodesWithCaches.remove(leftNode); - aliveSrvNodesWithCaches.remove(leftNode); - aliveRmtSrvNodesWithCaches.remove(leftNode); - } - - /** - * Creates a copy of nodes map without the given node. - * - * @param map Map to copy. - * @param exclNode Node to exclude. - */ - private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { - for (String cacheName : registeredCaches.keySet()) { - String maskedName = maskNull(cacheName); - - while (true) { - Collection<ClusterNode> oldNodes = map.get(maskedName); - - if (oldNodes == null || oldNodes.isEmpty()) - break; - - Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes); - - if (!newNodes.remove(exclNode)) - break; - - if (map.replace(maskedName, oldNodes, newNodes)) - break; - } - } - } - - /** - * Replaces {@code null} with {@code NULL_CACHE_NAME}. - * - * @param cacheName Cache name. - * @return Masked name. - */ - private String maskNull(@Nullable String cacheName) { - return cacheName == null ? NULL_CACHE_NAME : cacheName; - } - - /** - * @param topVer Topology version. - * @param nodes Nodes. - * @return Filtered collection (potentially empty, but never {@code null}). - */ - private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) { - if (nodes == null) - return Collections.emptyList(); - - // If no filtering needed, return original collection. - return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ? - nodes : - F.view(nodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.order() <= topVer; - } - }); - } - - /** @return Daemon nodes. */ - Collection<ClusterNode> daemonNodes() { - return daemonNodes; - } - - /** - * @param id Node ID. - * @return Node. - */ - @Nullable ClusterNode node(UUID id) { - return nodeMap.get(id); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); - } - } /** -- * Cache predicate. -- */ -- private static class CachePredicate { -- /** Cache filter. */ -- private final IgnitePredicate<ClusterNode> cacheFilter; -- -- /** If near cache is enabled on data nodes. */ -- private final boolean nearEnabled; -- -- /** Cache mode. */ -- private final CacheMode cacheMode; -- -- /** Collection of client near nodes. */ -- private final ConcurrentHashMap<UUID, Boolean> clientNodes; -- -- /** -- * @param cacheFilter Cache filter. -- * @param nearEnabled Near enabled flag. -- * @param cacheMode Cache mode. -- */ -- private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) { -- assert cacheFilter != null; -- -- this.cacheFilter = cacheFilter; -- this.nearEnabled = nearEnabled; -- this.cacheMode = cacheMode; -- -- clientNodes = new ConcurrentHashMap<>(); -- } -- -- /** -- * @param nodeId Near node ID to add. -- * @param nearEnabled Near enabled flag. -- * @return {@code True} if new node ID was added. -- */ -- public boolean addClientNode(UUID nodeId, boolean nearEnabled) { -- assert nodeId != null; -- -- Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled); -- -- return old == null; -- } -- -- /** -- * @param leftNodeId Left node ID. -- * @return {@code True} if existing node ID was removed. -- */ -- public boolean onNodeLeft(UUID leftNodeId) { -- assert leftNodeId != null; -- -- Boolean old = clientNodes.remove(leftNodeId); -- -- return old != null; -- } -- -- /** -- * @param node Node to check. -- * @return {@code True} if this node is a data node for given cache. -- */ -- public boolean dataNode(ClusterNode node) { -- return !node.isDaemon() && CU.affinityNode(node, cacheFilter); -- } -- -- /** -- * @param node Node to check. -- * @return {@code True} if cache is accessible on the given node. -- */ -- public boolean cacheNode(ClusterNode node) { -- return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); -- } -- -- /** -- * @param node Node to check. -- * @return {@code True} if near cache is present on the given nodes. -- */ -- public boolean nearNode(ClusterNode node) { -- if (node.isDaemon()) -- return false; -- -- if (CU.affinityNode(node, cacheFilter)) -- return nearEnabled; -- -- Boolean near = clientNodes.get(node.id()); -- -- return near != null && near; -- } -- -- /** -- * @param node Node to check. -- * @return {@code True} if client cache is present on the given nodes. -- */ -- public boolean clientNode(ClusterNode node) { -- if (node.isDaemon()) -- return false; -- -- Boolean near = clientNodes.get(node.id()); -- -- return near != null && !near; -- } -- } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 8c96c0c,736e630..a9919f8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -149,52 -149,52 +149,36 @@@ import static org.apache.ignite.transac public class GridCacheProcessor extends GridProcessorAdapter { /** Null cache name. */ private static final String NULL_NAME = U.id8(UUID.randomUUID()); -- -- /** Shared cache context. */ -- private GridCacheSharedContext<?, ?> sharedCtx; -- /** */ private final Map<String, GridCacheAdapter<?, ?>> caches; -- /** Caches stopped from onKernalStop callback. */ private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>(); -- /** Map of proxies. */ private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; -- /** Map of preload finish futures grouped by preload order. */ private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts; -- -- /** Maximum detected rebalance order. */ -- private int maxRebalanceOrder; -- /** Caches stop sequence. */ private final Deque<String> stopSeq; -- ++ /** Count down latch for caches. */ ++ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); ++ /** Shared cache context. */ ++ private GridCacheSharedContext<?, ?> sharedCtx; ++ /** Maximum detected rebalance order. */ ++ private int maxRebalanceOrder; /** Transaction interface implementation. */ private IgniteTransactionsImpl transactions; -- /** Pending cache starts. */ private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>(); -- /** Template configuration add futures. */ private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>(); -- /** Dynamic caches. */ private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); -- /** Cache templates. */ private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>(); -- /** */ private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); -- /** Must use JDK marshaller since it is used by discovery to fire custom events. */ private Marshaller marshaller = new JdkMarshaller(); -- -- /** Count down latch for caches. */ -- private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); -- /** */ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; @@@ -215,6 -215,6 +199,24 @@@ } /** ++ * @param name Name to mask. ++ * @return Masked name. ++ */ ++ private static String maskNull(String name) { ++ return name == null ? NULL_NAME : name; ++ } ++ ++ /** ++ * @param name Name to unmask. ++ * @return Unmasked name. ++ */ ++ @SuppressWarnings("StringEquality") ++ private static String unmaskNull(String name) { ++ // Intentional identity equality. ++ return name == NULL_NAME ? null : name; ++ } ++ ++ /** * @param internalCache Internal cache flag. * @param cfg Initializes cache configuration with proper defaults. * @param cacheObjCtx Cache object context. @@@ -2261,7 -2261,7 +2263,6 @@@ return F.first(initiateCacheChanges(F.asList(t), false)); } -- /** * @param cacheName Cache name to close. * @return Future that will be completed when cache is closed. @@@ -3365,21 -3365,21 +3366,54 @@@ } /** -- * @param name Name to mask. -- * @return Masked name. ++ * */ -- private static String maskNull(String name) { -- return name == null ? NULL_NAME : name; -- } ++ private static class LocalAffinityFunction implements AffinityFunction { ++ /** */ ++ private static final long serialVersionUID = 0L; -- /** -- * @param name Name to unmask. -- * @return Unmasked name. -- */ -- @SuppressWarnings("StringEquality") -- private static String unmaskNull(String name) { -- // Intentional identity equality. -- return name == NULL_NAME ? null : name; ++ /** {@inheritDoc} */ ++ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { ++ ClusterNode locNode = null; ++ ++ for (ClusterNode n : affCtx.currentTopologySnapshot()) { ++ if (n.isLocal()) { ++ locNode = n; ++ ++ break; ++ } ++ } ++ ++ if (locNode == null) ++ throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache"); ++ ++ List<List<ClusterNode>> res = new ArrayList<>(partitions()); ++ ++ for (int part = 0; part < partitions(); part++) ++ res.add(Collections.singletonList(locNode)); ++ ++ return Collections.unmodifiableList(res); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public void reset() { ++ // No-op. ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public int partitions() { ++ return 1; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public int partition(Object key) { ++ return 0; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public void removeNode(UUID nodeId) { ++ // No-op. ++ } } /** @@@ -3478,55 -3478,95 +3512,4 @@@ return S.toString(TemplateConfigurationFuture.class, this); } } -- -- /** - * - */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - private class TemplateConfigurationFuture extends GridFutureAdapter<Object> { - /** Start ID. */ - @GridToStringInclude - private IgniteUuid deploymentId; - - /** Cache name. */ - private String cacheName; - - /** - * @param cacheName Cache name. - * @param deploymentId Deployment ID. - */ - private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) { - this.deploymentId = deploymentId; - this.cacheName = cacheName; - } - - /** - * @return Start ID. - */ - public IgniteUuid deploymentId() { - return deploymentId; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - // Make sure to remove future before completion. - pendingTemplateFuts.remove(maskNull(cacheName), this); - - return super.onDone(res, err); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TemplateConfigurationFuture.class, this); - } - } /** -- * -- */ -- private static class LocalAffinityFunction implements AffinityFunction { -- /** */ -- private static final long serialVersionUID = 0L; -- -- /** {@inheritDoc} */ -- @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { -- ClusterNode locNode = null; -- -- for (ClusterNode n : affCtx.currentTopologySnapshot()) { -- if (n.isLocal()) { -- locNode = n; -- -- break; -- } -- } -- -- if (locNode == null) -- throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache"); -- -- List<List<ClusterNode>> res = new ArrayList<>(partitions()); -- -- for (int part = 0; part < partitions(); part++) -- res.add(Collections.singletonList(locNode)); -- -- return Collections.unmodifiableList(res); -- } -- -- /** {@inheritDoc} */ -- @Override public void reset() { -- // No-op. -- } -- -- /** {@inheritDoc} */ -- @Override public int partitions() { -- return 1; -- } -- -- /** {@inheritDoc} */ -- @Override public int partition(Object key) { -- return 0; -- } -- -- /** {@inheritDoc} */ -- @Override public void removeNode(UUID nodeId) { -- // No-op. -- } -- } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index f907d5b,698b035..c94cc46 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@@ -136,33 -136,33 +136,24 @@@ import static org.apache.ignite.interna public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { /** */ public static int MAX_ITERATORS = 1000; -- ++ /** */ ++ private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters = ++ new ConcurrentHashMap8<>(); ++ /** */ ++ private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes = ++ new ConcurrentHashMap8<>(); ++ /** */ ++ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** */ protected GridQueryProcessor qryProc; -- /** */ private String space; -- /** */ private int maxIterCnt; -- /** */ private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter(); -- -- /** */ -- private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters = -- new ConcurrentHashMap8<>(); -- -- /** */ -- private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes = -- new ConcurrentHashMap8<>(); -- /** */ private volatile ConcurrentMap<Object, CachedResult<?>> qryResCache = new ConcurrentHashMap8<>(); -- -- /** */ -- private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); -- /** Event listener. */ private GridLocalEventListener lsnr; @@@ -172,6 -172,6 +163,17 @@@ /** */ private AffinityTopologyVersion qryTopVer; ++ /** ++ * @param sndId Sender node ID. ++ * @param reqId Request ID. ++ * @return Recipient ID. ++ */ ++ private static Object recipient(UUID sndId, long reqId) { ++ assert sndId != null; ++ ++ return new IgniteBiTuple<>(sndId, reqId); ++ } ++ /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { qryProc = cctx.kernalContext().query(); @@@ -1715,17 -1715,17 +1717,6 @@@ } /** -- * @param sndId Sender node ID. -- * @param reqId Request ID. -- * @return Recipient ID. -- */ -- private static Object recipient(UUID sndId, long reqId) { -- assert sndId != null; -- -- return new IgniteBiTuple<>(sndId, reqId); -- } -- -- /** * @param qryInfo Info. * @return Iterator. * @throws IgniteCheckedException In case of error. @@@ -2031,6 -2031,6 +2022,89 @@@ } /** ++ * Query for {@link IndexingSpi}. ++ * ++ * @param keepPortable Keep portable flag. ++ * @return Query. ++ */ ++ public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) { ++ return new GridCacheQueryAdapter<>(cctx, ++ SPI, ++ null, ++ null, ++ null, ++ null, ++ false, ++ keepPortable); ++ } ++ ++ /** ++ * Creates user's predicate based scan query. ++ * ++ * @param filter Scan filter. ++ * @param part Partition. ++ * @param keepPortable Keep portable flag. ++ * @return Created query. ++ */ ++ public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, ++ @Nullable Integer part, boolean keepPortable) { ++ ++ return new GridCacheQueryAdapter<>(cctx, ++ SCAN, ++ null, ++ null, ++ (IgniteBiPredicate<Object, Object>)filter, ++ part, ++ false, ++ keepPortable); ++ } ++ ++ /** ++ * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery} ++ * documentation. ++ * ++ * @param clsName Query class name. ++ * @param search Search clause. ++ * @param keepPortable Keep portable flag. ++ * @return Created query. ++ */ ++ public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName, ++ String search, boolean keepPortable) { ++ A.notNull("clsName", clsName); ++ A.notNull("search", search); ++ ++ return new GridCacheQueryAdapter<>(cctx, ++ TEXT, ++ clsName, ++ search, ++ null, ++ null, ++ false, ++ keepPortable); ++ } ++ ++ /** ++ * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery} ++ * documentation. ++ * ++ * @param qry Query. ++ * @param keepPortable Keep portable flag. ++ * @return Created query. ++ */ ++ public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) { ++ A.notNull(qry, "qry"); ++ ++ return new GridCacheQueryAdapter<>(cctx, ++ SQL_FIELDS, ++ null, ++ qry, ++ null, ++ null, ++ false, ++ keepPortable); ++ } ++ ++ /** * Metadata job. */ @GridInternal @@@ -2435,315 -2435,313 +2509,98 @@@ /** * */ -- private abstract class AbstractLazySwapEntry { ++ private static class CompoundIterator<T> extends GridIteratorAdapter<T> { /** */ -- private K key; ++ private static final long serialVersionUID = 4585888051556166304L; /** */ -- private V val; ++ private final List<GridIterator<T>> iters; -- /** -- * @return Key bytes. -- */ -- protected abstract byte[] keyBytes(); ++ /** */ ++ private int idx; -- /** -- * @return Value. -- * @throws IgniteCheckedException If failed. -- */ -- protected abstract V unmarshalValue() throws IgniteCheckedException; ++ /** */ ++ private GridIterator<T> iter; /** -- * @return Key. ++ * @param iters Iterators. */ -- K key() { -- try { -- if (key != null) -- return key; ++ private CompoundIterator(List<GridIterator<T>> iters) { ++ if (iters.isEmpty()) ++ throw new IllegalArgumentException(); -- key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false); ++ this.iters = iters; -- return key; -- } -- catch (IgniteCheckedException e) { -- throw new IgniteException(e); -- } ++ iter = F.first(iters); } -- /** -- * @return Value. -- */ -- V value() { -- try { -- if (val != null) -- return val; ++ /** {@inheritDoc} */ ++ @Override public boolean hasNextX() throws IgniteCheckedException { ++ if (iter.hasNextX()) ++ return true; -- val = unmarshalValue(); ++ idx++; -- return val; -- } -- catch (IgniteCheckedException e) { -- throw new IgniteException(e); ++ while (idx < iters.size()) { ++ iter = iters.get(idx); ++ ++ if (iter.hasNextX()) ++ return true; ++ ++ idx++; } ++ ++ return false; } -- /** -- * @return TTL. -- */ -- abstract long timeToLive(); ++ /** {@inheritDoc} */ ++ @Override public T nextX() throws IgniteCheckedException { ++ if (!hasNextX()) ++ throw new NoSuchElementException(); -- /** -- * @return Expire time. -- */ -- abstract long expireTime(); ++ return iter.nextX(); ++ } -- /** -- * @return Version. -- */ -- abstract GridCacheVersion version(); ++ /** {@inheritDoc} */ ++ @Override public void removeX() throws IgniteCheckedException { ++ throw new UnsupportedOperationException(); ++ } } /** -- * ++ * Cached result. */ -- private class LazySwapEntry extends AbstractLazySwapEntry { ++ private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> { ++ /** Absolute position of each recipient. */ ++ private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); /** */ -- private final Map.Entry<byte[], byte[]> e; ++ private CircularQueue<R> queue; ++ /** */ ++ private int pruned; /** -- * @param e Entry with ++ * @param rcpt ID of the recipient. */ -- LazySwapEntry(Map.Entry<byte[], byte[]> e) { -- this.e = e; -- } ++ protected CachedResult(Object rcpt) { ++ boolean res = addRecipient(rcpt); -- /** {@inheritDoc} */ -- @Override protected byte[] keyBytes() { -- return e.getKey(); ++ assert res; } -- /** {@inheritDoc} */ -- @SuppressWarnings("IfMayBeConditional") -- @Override protected V unmarshalValue() throws IgniteCheckedException { -- IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue()); ++ /** ++ * Close if this result does not have any other recipients. ++ * ++ * @param rcpt ID of the recipient. ++ * @throws IgniteCheckedException If failed. ++ */ ++ public void closeIfNotShared(Object rcpt) throws IgniteCheckedException { ++ assert isDone(); -- CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); ++ synchronized (recipients) { ++ if (recipients.isEmpty()) ++ return; -- return obj.value(cctx.cacheObjectContext(), false); -- } -- -- /** {@inheritDoc} */ -- @Override long timeToLive() { -- return GridCacheSwapEntryImpl.timeToLive(e.getValue()); -- } -- -- /** {@inheritDoc} */ -- @Override long expireTime() { -- return GridCacheSwapEntryImpl.expireTime(e.getValue()); -- } -- -- /** {@inheritDoc} */ -- @Override GridCacheVersion version() { -- return GridCacheSwapEntryImpl.version(e.getValue()); -- } -- } -- -- /** -- * -- */ -- private class LazyOffheapEntry extends AbstractLazySwapEntry { -- /** */ -- private final T2<Long, Integer> keyPtr; -- -- /** */ -- private final T2<Long, Integer> valPtr; -- -- /** -- * @param keyPtr Key address. -- * @param valPtr Value address. -- */ -- private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) { -- assert keyPtr != null; -- assert valPtr != null; -- -- this.keyPtr = keyPtr; -- this.valPtr = valPtr; -- } -- -- /** {@inheritDoc} */ -- @Override protected byte[] keyBytes() { -- return U.copyMemory(keyPtr.get1(), keyPtr.get2()); -- } -- -- /** {@inheritDoc} */ -- @Override protected V unmarshalValue() throws IgniteCheckedException { -- long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2()); -- -- CacheObject obj = cctx.fromOffheap(ptr, false); -- -- V val = CU.value(obj, cctx, false); -- -- assert val != null; -- -- return val; -- } -- -- /** {@inheritDoc} */ -- @Override long timeToLive() { -- return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1()); -- } -- -- /** {@inheritDoc} */ -- @Override long expireTime() { -- return GridCacheOffheapSwapEntry.expireTime(valPtr.get1()); -- } -- -- /** {@inheritDoc} */ -- @Override GridCacheVersion version() { -- return GridCacheOffheapSwapEntry.version(valPtr.get1()); -- } -- } -- -- /** -- * -- */ -- private class OffheapIteratorClosure -- extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> { -- /** */ -- private static final long serialVersionUID = 7410163202728985912L; -- -- /** */ -- private IgniteBiPredicate<K, V> filter; -- -- /** */ -- private boolean keepPortable; -- -- /** -- * @param filter Filter. -- * @param keepPortable Keep portable flag. -- */ -- private OffheapIteratorClosure( -- @Nullable IgniteBiPredicate<K, V> filter, -- boolean keepPortable) { -- assert filter != null; -- -- this.filter = filter; -- this.keepPortable = keepPortable; -- } -- -- /** {@inheritDoc} */ -- @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr, -- T2<Long, Integer> valPtr) -- throws IgniteCheckedException { -- LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr); -- -- K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); -- V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); -- -- if (!filter.apply(key, val)) -- return null; -- -- return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())); -- } -- } -- -- /** -- * -- */ -- private static class CompoundIterator<T> extends GridIteratorAdapter<T> { -- /** */ -- private static final long serialVersionUID = 4585888051556166304L; -- -- /** */ -- private final List<GridIterator<T>> iters; -- -- /** */ -- private int idx; -- -- /** */ -- private GridIterator<T> iter; -- -- /** -- * @param iters Iterators. -- */ -- private CompoundIterator(List<GridIterator<T>> iters) { -- if (iters.isEmpty()) -- throw new IllegalArgumentException(); -- -- this.iters = iters; -- -- iter = F.first(iters); -- } -- -- /** {@inheritDoc} */ -- @Override public boolean hasNextX() throws IgniteCheckedException { -- if (iter.hasNextX()) -- return true; -- -- idx++; -- -- while (idx < iters.size()) { -- iter = iters.get(idx); -- -- if (iter.hasNextX()) -- return true; -- -- idx++; -- } -- -- return false; -- } -- -- /** {@inheritDoc} */ -- @Override public T nextX() throws IgniteCheckedException { -- if (!hasNextX()) -- throw new NoSuchElementException(); -- -- return iter.nextX(); -- } -- -- /** {@inheritDoc} */ -- @Override public void removeX() throws IgniteCheckedException { -- throw new UnsupportedOperationException(); -- } -- } -- -- /** -- * Cached result. -- */ -- private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> { - /** Absolute position of each recipient. */ - private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); -- /** */ -- private CircularQueue<R> queue; - -- /** */ -- private int pruned; - - /** Absolute position of each recipient. */ - private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); -- -- /** -- * @param rcpt ID of the recipient. -- */ -- protected CachedResult(Object rcpt) { -- boolean res = addRecipient(rcpt); -- -- assert res; -- } -- -- /** -- * Close if this result does not have any other recipients. -- * -- * @param rcpt ID of the recipient. -- * @throws IgniteCheckedException If failed. -- */ -- public void closeIfNotShared(Object rcpt) throws IgniteCheckedException { -- assert isDone(); -- -- synchronized (recipients) { -- if (recipients.isEmpty()) -- return; -- -- recipients.remove(rcpt); ++ recipients.remove(rcpt); if (recipients.isEmpty()) get().close(); @@@ -3022,85 -3020,126 +2879,217 @@@ } /** -- * Query for {@link IndexingSpi}. * -- * @param keepPortable Keep portable flag. -- * @return Query. */ -- public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) { -- return new GridCacheQueryAdapter<>(cctx, -- SPI, -- null, -- null, -- null, -- null, -- false, -- keepPortable); ++ private abstract class AbstractLazySwapEntry { ++ /** */ ++ private K key; ++ ++ /** */ ++ private V val; ++ ++ /** ++ * @return Key bytes. ++ */ ++ protected abstract byte[] keyBytes(); ++ ++ /** ++ * @return Value. ++ * @throws IgniteCheckedException If failed. ++ */ ++ protected abstract V unmarshalValue() throws IgniteCheckedException; ++ ++ /** ++ * @return Key. ++ */ ++ K key() { ++ try { ++ if (key != null) ++ return key; ++ ++ key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false); ++ ++ return key; ++ } ++ catch (IgniteCheckedException e) { ++ throw new IgniteException(e); ++ } ++ } ++ ++ /** ++ * @return Value. ++ */ ++ V value() { ++ try { ++ if (val != null) ++ return val; ++ ++ val = unmarshalValue(); ++ ++ return val; ++ } ++ catch (IgniteCheckedException e) { ++ throw new IgniteException(e); ++ } ++ } ++ ++ /** ++ * @return TTL. ++ */ ++ abstract long timeToLive(); ++ ++ /** ++ * @return Expire time. ++ */ ++ abstract long expireTime(); ++ ++ /** ++ * @return Version. ++ */ ++ abstract GridCacheVersion version(); } /** -- * Creates user's predicate based scan query. * -- * @param filter Scan filter. -- * @param part Partition. -- * @param keepPortable Keep portable flag. -- * @return Created query. */ -- public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, -- @Nullable Integer part, boolean keepPortable) { ++ private class LazySwapEntry extends AbstractLazySwapEntry { ++ /** */ ++ private final Map.Entry<byte[], byte[]> e; -- return new GridCacheQueryAdapter<>(cctx, -- SCAN, -- null, -- null, -- (IgniteBiPredicate<Object, Object>)filter, -- part, -- false, -- keepPortable); ++ /** ++ * @param e Entry with ++ */ ++ LazySwapEntry(Map.Entry<byte[], byte[]> e) { ++ this.e = e; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override protected byte[] keyBytes() { ++ return e.getKey(); ++ } ++ ++ /** {@inheritDoc} */ ++ @SuppressWarnings("IfMayBeConditional") ++ @Override protected V unmarshalValue() throws IgniteCheckedException { ++ IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue()); ++ ++ CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); ++ ++ return obj.value(cctx.cacheObjectContext(), false); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override long timeToLive() { ++ return GridCacheSwapEntryImpl.timeToLive(e.getValue()); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override long expireTime() { ++ return GridCacheSwapEntryImpl.expireTime(e.getValue()); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override GridCacheVersion version() { ++ return GridCacheSwapEntryImpl.version(e.getValue()); ++ } } /** -- * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery} -- * documentation. * -- * @param clsName Query class name. -- * @param search Search clause. -- * @param keepPortable Keep portable flag. -- * @return Created query. */ -- public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName, -- String search, boolean keepPortable) { -- A.notNull("clsName", clsName); -- A.notNull("search", search); ++ private class LazyOffheapEntry extends AbstractLazySwapEntry { ++ /** */ ++ private final T2<Long, Integer> keyPtr; -- return new GridCacheQueryAdapter<>(cctx, -- TEXT, -- clsName, -- search, -- null, -- null, -- false, -- keepPortable); ++ /** */ ++ private final T2<Long, Integer> valPtr; ++ ++ /** ++ * @param keyPtr Key address. ++ * @param valPtr Value address. ++ */ ++ private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) { ++ assert keyPtr != null; ++ assert valPtr != null; ++ ++ this.keyPtr = keyPtr; ++ this.valPtr = valPtr; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override protected byte[] keyBytes() { ++ return U.copyMemory(keyPtr.get1(), keyPtr.get2()); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override protected V unmarshalValue() throws IgniteCheckedException { ++ long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2()); ++ ++ CacheObject obj = cctx.fromOffheap(ptr, false); ++ ++ V val = CU.value(obj, cctx, false); ++ ++ assert val != null; ++ ++ return val; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override long timeToLive() { ++ return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1()); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override long expireTime() { ++ return GridCacheOffheapSwapEntry.expireTime(valPtr.get1()); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override GridCacheVersion version() { ++ return GridCacheOffheapSwapEntry.version(valPtr.get1()); ++ } } /** - * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery} - * documentation. * - * @param qry Query. - * @param keepPortable Keep portable flag. - * @return Created query. */ - public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) { - A.notNull(qry, "qry"); + private class OffheapIteratorClosure + extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> { + /** */ + private static final long serialVersionUID = 7410163202728985912L; - return new GridCacheQueryAdapter<>(cctx, - SQL_FIELDS, - null, - qry, - null, - null, - false, - keepPortable); + /** */ + private IgniteBiPredicate<K, V> filter; + + /** */ + private boolean keepPortable; + + /** + * @param filter Filter. + * @param keepPortable Keep portable flag. + */ + private OffheapIteratorClosure( + @Nullable IgniteBiPredicate<K, V> filter, + boolean keepPortable) { + assert filter != null; + + this.filter = filter; + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr, + T2<Long, Integer> valPtr) + throws IgniteCheckedException { + LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr); + + K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); + V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); + + if (!filter.apply(key, val)) + return null; + + return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())); + } - } /** - * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery} - * documentation. - * - * @param qry Query. - * @param keepPortable Keep portable flag. - * @return Created query. - */ - public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) { - A.notNull(qry, "qry"); - - return new GridCacheQueryAdapter<>(cctx, - SQL_FIELDS, - null, - qry, - null, - null, - false, - keepPortable); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 9117863,df79232..afb599d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@@ -92,13 -91,13 +92,10 @@@ public class GridRestProcessor extends /** Default session timout. */ private static final int DEFAULT_SES_TIMEOUT = 30_000; -- -- /** Protocols. */ -- private final Collection<GridRestProtocol> protos = new ArrayList<>(); -- /** Command handlers. */ protected final Map<GridRestCommand, GridRestCommandHandler> handlers = new EnumMap<>(GridRestCommand.class); -- ++ /** Protocols. */ ++ private final Collection<GridRestProtocol> protos = new ArrayList<>(); /** */ private final CountDownLatch startLatch = new CountDownLatch(1); @@@ -132,6 -131,6 +129,85 @@@ private final long sesTtl; /** ++ * @param ctx Context. ++ */ ++ public GridRestProcessor(GridKernalContext ctx) { ++ super(ctx); ++ ++ long sesExpTime0; ++ String sesExpTime = null; ++ ++ try { ++ sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT); ++ ++ if (sesExpTime != null) ++ sesExpTime0 = Long.valueOf(sesExpTime) * 1000; ++ else ++ sesExpTime0 = DEFAULT_SES_TIMEOUT; ++ } ++ catch (NumberFormatException ignore) { ++ U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT=" ++ + sesExpTime + "]"); ++ ++ sesExpTime0 = DEFAULT_SES_TIMEOUT; ++ } ++ ++ sesTtl = sesExpTime0; ++ ++ sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker", ++ new GridWorker(ctx.gridName(), "session-timeout-worker", log) { ++ @Override protected void body() throws InterruptedException { ++ while (!isCancelled()) { ++ Thread.sleep(SES_TIMEOUT_CHECK_DELAY); ++ ++ for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) { ++ Session ses = e.getValue(); ++ ++ if (ses.isTimedOut(sesTtl)) { ++ sesId2Ses.remove(ses.sesId, ses); ++ ++ clientId2SesId.remove(ses.clientId, ses.sesId); ++ } ++ } ++ } ++ } ++ }); ++ } ++ ++/** ++ * Applies interceptor to a response object. ++ * Specially handler {@link Map} and {@link Collection} responses. ++ * ++ * @param obj Response object. ++ * @param interceptor Interceptor to apply. ++ * @return Intercepted object. ++ */ ++ private static Object interceptSendObject(Object obj, ConnectorMessageInterceptor interceptor) { ++ if (obj instanceof Map) { ++ Map<Object, Object> original = (Map<Object, Object>)obj; ++ ++ Map<Object, Object> m = new HashMap<>(); ++ ++ for (Map.Entry e : original.entrySet()) ++ m.put(interceptor.onSend(e.getKey()), interceptor.onSend(e.getValue())); ++ ++ return m; ++ } ++ else if (obj instanceof Collection) { ++ Collection<Object> original = (Collection<Object>)obj; ++ ++ Collection<Object> c = new ArrayList<>(original.size()); ++ ++ for (Object e : original) ++ c.add(interceptor.onSend(e)); ++ ++ return c; ++ } ++ else ++ return interceptor.onSend(obj); ++ } ++ ++ /** * @param req Request. * @return Future. */ @@@ -386,52 -385,52 +462,6 @@@ } } -- /** -- * @param ctx Context. -- */ -- public GridRestProcessor(GridKernalContext ctx) { -- super(ctx); -- -- long sesExpTime0; -- String sesExpTime = null; -- -- try { -- sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT); -- -- if (sesExpTime != null) -- sesExpTime0 = Long.valueOf(sesExpTime) * 1000; -- else -- sesExpTime0 = DEFAULT_SES_TIMEOUT; -- } -- catch (NumberFormatException ignore) { -- U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT=" -- + sesExpTime + "]"); -- -- sesExpTime0 = DEFAULT_SES_TIMEOUT; -- } -- -- sesTtl = sesExpTime0; -- -- sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker", -- new GridWorker(ctx.gridName(), "session-timeout-worker", log) { -- @Override protected void body() throws InterruptedException { -- while (!isCancelled()) { -- Thread.sleep(SES_TIMEOUT_CHECK_DELAY); -- -- for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) { -- Session ses = e.getValue(); -- -- if (ses.isTimedOut(sesTtl)) { -- sesId2Ses.remove(ses.sesId, ses); -- -- clientId2SesId.remove(ses.clientId, ses.sesId); -- } -- } -- } -- } -- }); -- } -- /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { if (isRestEnabled()) { @@@ -516,7 -517,54 +546,7 @@@ } } - /** - * Applies {@link ConnectorMessageInterceptor} - * from {@link ConnectorConfiguration#getMessageInterceptor()} ()} - * to all user parameters in the request. - * - * @param req Client request. - */ - private void interceptRequest(GridRestRequest req) { - ConnectorMessageInterceptor interceptor = config().getMessageInterceptor(); - - if (interceptor == null) - return; - - if (req instanceof GridRestCacheRequest) { - GridRestCacheRequest req0 = (GridRestCacheRequest) req; - - req0.key(interceptor.onReceive(req0.key())); - req0.value(interceptor.onReceive(req0.value())); - req0.value2(interceptor.onReceive(req0.value2())); - - Map<Object, Object> oldVals = req0.values(); - - if (oldVals != null) { - Map<Object, Object> newVals = U.newHashMap(oldVals.size()); - - for (Map.Entry<Object, Object> e : oldVals.entrySet()) - newVals.put(interceptor.onReceive(e.getKey()), interceptor.onReceive(e.getValue())); - - req0.values(U.sealMap(newVals)); - } - } - else if (req instanceof GridRestTaskRequest) { - GridRestTaskRequest req0 = (GridRestTaskRequest) req; - - List<Object> oldParams = req0.params(); - - if (oldParams != null) { - Collection<Object> newParams = new ArrayList<>(oldParams.size()); - - for (Object o : oldParams) - newParams.add(interceptor.onReceive(o)); - - req0.params(U.sealList(newParams)); - } - } - } - -- /** ++ /** * Applies {@link ConnectorMessageInterceptor} * from {@link ConnectorConfiguration#getMessageInterceptor()} ()} * to all user parameters in the request. @@@ -562,8 -610,8 +592,7 @@@ } } } -- -- /** ++/** * Applies {@link ConnectorMessageInterceptor} from * {@link ConnectorConfiguration#getMessageInterceptor()} * to all user objects in the response.
