IGNITE-4834: Added ability to execute custom tasks from exchange thread.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61c845d6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61c845d6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61c845d6 Branch: refs/heads/ignite-1192 Commit: 61c845d66b82463bfad71c28093f2cbf54d99eb0 Parents: 9020d12 Author: devozerov <[email protected]> Authored: Mon Mar 20 10:16:36 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 20 10:16:36 2017 +0300 ---------------------------------------------------------------------- .../cache/CachePartitionExchangeWorkerTask.java | 29 ++++ .../GridCachePartitionExchangeManager.java | 171 +++++++++++++------ .../processors/cache/GridCacheProcessor.java | 19 +++ .../GridDhtPartitionsExchangeFuture.java | 8 +- 4 files changed, 176 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java new file mode 100644 index 0000000..80ef9f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Cache partition exchange worker task marker interface. + */ +public interface CachePartitionExchangeWorkerTask { + /** + * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which + * must be executed from within exchange thread. + */ + boolean isExchange(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3e72efb..f7edb08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; @@ -55,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; @@ -222,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; + DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); - if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); + if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg; Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); @@ -257,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } - else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { - CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage(); + else if (customMsg instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { @@ -267,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } - else - exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + else { + exchangeFuture(msg.exchangeId(), null, null, null, null) + .onAffinityChangeMessage(evt.eventNode(), msg); + } + } + else { + // Process event as custom discovery task if needed. + CachePartitionExchangeWorkerTask task = + cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + + if (task != null) + exchWorker.addCustomTask(task); } } @@ -369,7 +379,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - exchWorker.futQ.addFirst(fut); + exchWorker.addFirstExchangeFuture(fut); if (!cctx.kernalContext().clientNode()) { for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { @@ -684,7 +694,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if pending future queue is empty. */ public boolean hasPendingExchange() { - return !exchWorker.futQ.isEmpty(); + return exchWorker.hasPendingExchange(); + } + + /** + * Add custom task. + * + * @param task Task. + */ + public void addCustomTask(CachePartitionExchangeWorkerTask task) { + assert !task.isExchange(); + + exchWorker.addCustomTask(task); } /** @@ -704,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ public void forceDummyExchange(boolean reassign, GridDhtPartitionsExchangeFuture exchFut) { - exchWorker.addFuture( + exchWorker.addExchangeFuture( new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId())); } @@ -716,7 +737,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) { GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>(); - exchWorker.addFuture( + exchWorker.addExchangeFuture( new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut)); return fut; @@ -1192,7 +1213,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private boolean addFuture(GridDhtPartitionsExchangeFuture fut) { if (fut.onAdded()) { - exchWorker.addFuture(fut); + exchWorker.addExchangeFuture(fut); return true; } @@ -1345,10 +1366,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Last exchange future: " + lastInitializedFut); - U.warn(log, "Pending exchange futures:"); - - for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ) - U.warn(log, ">>> " + fut); + exchWorker.dumpExchangeDebugInfo(); if (!readyFuts.isEmpty()) { U.warn(log, "Pending affinity ready futures:"); @@ -1547,28 +1565,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(time, MILLISECONDS); - } - - /** * @param node Target node. * @return {@code True} if can use compression for partition map messages. */ @@ -1592,32 +1588,94 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private class ExchangeWorker extends GridWorker { /** Future queue. */ - private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ = + private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ = new LinkedBlockingDeque<>(); /** Busy flag used as performance optimization to stop current preloading. */ private volatile boolean busy; /** - * + * Constructor. */ private ExchangeWorker() { super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log); } /** + * Add first exchange future. + * + * @param exchFut Exchange future. + */ + void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) { + futQ.addFirst(exchFut); + } + + /** * @param exchFut Exchange future. */ - void addFuture(GridDhtPartitionsExchangeFuture exchFut) { + void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) { assert exchFut != null; - if (!exchFut.dummy() || (futQ.isEmpty() && !busy)) + if (!exchFut.dummy() || (!hasPendingExchange() && !busy)) futQ.offer(exchFut); if (log.isDebugEnabled()) log.debug("Added exchange future to exchange worker: " + exchFut); } + /** + * Add custom exchange task. + * + * @param task Task. + */ + void addCustomTask(CachePartitionExchangeWorkerTask task) { + assert task != null; + + assert !task.isExchange(); + + futQ.offer(task); + } + + /** + * Process custom exchange task. + * + * @param task Task. + */ + void processCustomTask(CachePartitionExchangeWorkerTask task) { + try { + cctx.cache().processCustomExchangeTask(task); + } + catch (Exception e) { + U.warn(log, "Failed to process custom exchange task: " + task, e); + } + } + + /** + * @return Whether pending exchange future exists. + */ + boolean hasPendingExchange() { + if (!futQ.isEmpty()) { + for (CachePartitionExchangeWorkerTask task : futQ) { + if (task.isExchange()) + return true; + } + } + + return false; + } + + /** + * Dump debug info. + */ + void dumpExchangeDebugInfo() { + U.warn(log, "Pending exchange futures:"); + + for (CachePartitionExchangeWorkerTask task: futQ) { + if (task.isExchange()) + U.warn(log, ">>> " + task); + } + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { long timeout = cctx.gridConfig().getNetworkTimeout(); @@ -1625,7 +1683,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana int cnt = 0; while (!isCancelled()) { - GridDhtPartitionsExchangeFuture exchFut = null; + CachePartitionExchangeWorkerTask task = null; cnt++; @@ -1640,7 +1698,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } // If not first preloading and no more topology events present. - if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) + if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished) timeout = cctx.gridConfig().getNetworkTimeout(); // After workers line up and before preloading starts we initialize all futures. @@ -1656,11 +1714,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } // Take next exchange future. - exchFut = poll(futQ, timeout, this); + if (isCancelled()) + Thread.currentThread().interrupt(); - if (exchFut == null) + task = futQ.poll(timeout, MILLISECONDS); + + if (task == null) continue; // Main while loop. + if (!task.isExchange()) { + processCustomTask(task); + + continue; + } + + assert task instanceof GridDhtPartitionsExchangeFuture; + + GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task; + busy = true; Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; @@ -1727,7 +1798,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana changed |= cacheCtx.topology().afterExchange(exchFut); } - if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) + if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange()) refreshPartitions(); } else { @@ -1824,7 +1895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); - if (futQ.isEmpty()) { + if (!hasPendingExchange()) { U.log(log, "Rebalancing started " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); @@ -1850,7 +1921,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } catch (IgniteCheckedException e) { U.error(log, "Failed to wait for completion of partition map exchange " + - "(preloading will not start): " + exchFut, e); + "(preloading will not start): " + task, e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c7ac31a..a7d38a7 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -367,6 +367,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Create exchange worker task for custom discovery message. + * + * @param msg Custom discovery message. + * @return Task or {@code null} if message doesn't require any special processing. + */ + public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) { + return null; + } + + /** + * Process custom exchange task. + * + * @param task Task. + */ + public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) { + // No-op. + } + + /** * @param c Ignite configuration. * @param cc Configuration to validate. * @param cacheType Cache type. http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 46fb144..50937a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS * Future for exchanging partition maps. */ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask { /** */ public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); @@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** {@inheritDoc} */ + @Override public boolean isExchange() { + return true; + } + + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); }
