IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at unstable topology in !allowOverwrite mode fixed)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7499828 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7499828 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7499828 Branch: refs/heads/ignite-4242 Commit: b7499828c928e02e8e554f960f3754e4d08bfbe0 Parents: 8b59f4e Author: Anton Vinogradov <[email protected]> Authored: Thu Nov 10 16:10:21 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Nov 10 16:10:21 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteDataStreamer.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 5 +- .../processors/cache/GridCacheMvccManager.java | 77 +++ .../GridCachePartitionExchangeManager.java | 5 + .../cache/GridCacheSharedContext.java | 1 + .../datastreamer/DataStreamProcessor.java | 104 +++- .../datastreamer/DataStreamerImpl.java | 603 ++++++++++++++----- .../ignite/internal/util/GridLogThrottle.java | 29 +- .../cache/IgniteCacheDynamicStopSelfTest.java | 48 +- ...CacheLoadingConcurrentGridStartSelfTest.java | 251 +++++++- ...ncurrentGridStartSelfTestAllowOverwrite.java | 30 + .../DataStreamProcessorSelfTest.java | 4 +- .../datastreamer/DataStreamerImplSelfTest.java | 170 ++++-- .../DataStreamerMultiThreadedSelfTest.java | 2 - .../datastreamer/DataStreamerTimeoutTest.java | 92 ++- .../testsuites/IgniteCacheTestSuite2.java | 2 + 16 files changed, 1120 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 484fee9..4e00d66 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -291,7 +291,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. */ - public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException; + public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException; /** * Adds data for streaming on remote node. This method can be called from multiple http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5996672..950153f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3444,11 +3444,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (val == null) { skipQryNtf = true; - if (cctx.deferredDelete() && !isInternal()) { - assert !deletedUnlocked(); - + if (cctx.deferredDelete() && !deletedUnlocked() && !isInternal()) deletedUnlocked(true); - } } else if (deletedUnlocked()) deletedUnlocked(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c4db01e..c57e17c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentFactory; +import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -108,6 +109,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); + /** Pending data streamer futures. */ + private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>(); + /** */ private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>(); @@ -446,6 +450,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Collection of pending data streamer futures. + */ + public Collection<DataStreamerFuture> dataStreamerFutures() { + return dataStreamerFuts; + } + + /** * Gets future by given future ID. * * @param futVer Future ID. @@ -476,6 +487,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param topVer Topology version. + */ + public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) { + final DataStreamerFuture fut = new DataStreamerFuture(topVer); + + boolean add = dataStreamerFuts.add(fut); + + assert add; + + return fut; + } + + /** + + /** * Adds future. * * @param fut Future. @@ -1056,6 +1082,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * + * @return Finish update future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture<?> finishDataStreamerUpdates() { + GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); + + for (IgniteInternalFuture fut : dataStreamerFuts) + res.add(fut); + + res.markInitialized(); + + return res; + } + + /** * @param keys Key for which locks should be released. * @param cacheId Cache ID. * @param topVer Topology version. @@ -1294,4 +1336,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { CachePartialUpdateCheckedException.class.isAssignableFrom(cls); } } + + /** + * + */ + private class DataStreamerFuture extends GridFutureAdapter<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version. Instance field for toString method only. */ + @GridToStringInclude + private final AffinityTopologyVersion topVer; + + /** + * @param topVer Topology version. + */ + DataStreamerFuture(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + dataStreamerFuts.remove(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerFuture.class, this, super.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a901e2a..00d2d16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1309,6 +1309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridCacheFuture<?> fut : mvcc.atomicFutures()) U.warn(log, ">>> " + fut); + U.warn(log, "Pending data streamer futures:"); + + for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) + U.warn(log, ">>> " + fut); + if (tm != null) { U.warn(log, "Pending transaction deadlock detection futures:"); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 8f39235..117a5c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -636,6 +636,7 @@ public class GridCacheSharedContext<K, V> { f.add(mvcc().finishExplicitLocks(topVer)); f.add(tm().finishTxs(topVer)); f.add(mvcc().finishAtomicUpdates(topVer)); + f.add(mvcc().finishDataStreamerUpdates()); f.markInitialized(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 7663735..32fda87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.DelayQueue; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -29,13 +30,18 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.stream.StreamReceiver; import org.apache.ignite.thread.IgniteThread; @@ -288,32 +294,94 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { return; } - Collection<DataStreamerEntry> col = req.entries(); + localUpdate(nodeId, req, updater, topic); + } + finally { + busyLock.leaveBusy(); + } + } - DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, - log, - req.cacheName(), - col, - req.ignoreDeploymentOwnership(), - req.skipStore(), - req.keepBinary(), - updater); + /** + * @param nodeId Node id. + * @param req Request. + * @param updater Updater. + * @param topic Topic. + */ + private void localUpdate(final UUID nodeId, + final DataStreamerRequest req, + final StreamReceiver<K, V> updater, + final Object topic) { + final boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater); - Exception err = null; + try { + GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName()); + + if (cache == null) + throw new IgniteCheckedException("Cache not created or already destroyed."); + + GridCacheContext cctx = cache.context(); + + DataStreamerUpdateJob job = null; + + GridFutureAdapter waitFut = null; + + if (!allowOverwrite) + cctx.topology().readLock(); try { - job.call(); - } - catch (Exception e) { - U.error(log, "Failed to finish update job.", e); + GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); - err = e; + AffinityTopologyVersion topVer = fut.topologyVersion(); + + if (!allowOverwrite && !topVer.equals(req.topologyVersion())) { + Exception err = new IgniteCheckedException( + "DataStreamer will retry data transfer at stable topology " + + "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]"); + + sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + } + else if (allowOverwrite || fut.isDone()) { + job = new DataStreamerUpdateJob(ctx, + log, + req.cacheName(), + req.entries(), + req.ignoreDeploymentOwnership(), + req.skipStore(), + req.keepBinary(), + updater); + + waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer); + } + else { + fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { + localUpdate(nodeId, req, updater, topic); + } + }); + } + } + finally { + if (!allowOverwrite) + cctx.topology().readUnlock(); } - sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + if (job != null) { + try { + job.call(); + + sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment()); + } + finally { + if (waitFut != null) + waitFut.onDone(); + } + } } - finally { - busyLock.leaveBusy(); + catch (Throwable e) { + sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment()); + + if (e instanceof Error) + throw (Error)e; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a6065dd..443783b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -39,15 +40,15 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import javax.cache.expiry.ExpiryPolicy; - import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteDataStreamerTimeoutException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteDataStreamerTimeoutException; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; @@ -72,11 +73,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheGateway; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.dr.GridDrType; @@ -92,6 +95,8 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.GPC; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; @@ -102,6 +107,7 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.stream.StreamReceiver; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -113,12 +119,15 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { - /** Default policy reoslver. */ + /** Default policy resolver. */ private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver(); /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); + /** Amount of permissions should be available to continue new data processing. */ + private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE; + /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; @@ -178,6 +187,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@code True} if data loader has been cancelled. */ private volatile boolean cancelled; + /** Fail counter. */ + private final LongAdder8 failCntr = new LongAdder8(); + /** Active futures of this data loader. */ @GridToStringInclude private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>(); @@ -189,6 +201,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed boolean rmv = activeFuts.remove(t); assert rmv; + + Throwable err = t.error(); + + if (err != null && !(err instanceof IgniteClientDisconnectedCheckedException)) { + LT.error(log, t.error(), "DataStreamer operation failed.", true); + + failCntr.increment(); + + cancelled = true; + } } }; @@ -231,6 +253,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ private static boolean isWarningPrinted; + /** Allows to pause new data processing while failed data processing in progress. */ + private final Semaphore remapSem = new Semaphore(REMAP_SEMAPHORE_PERMISSIONS_COUNT); + + /** */ + private final ConcurrentLinkedDeque<Runnable> dataToRemap = new ConcurrentLinkedDeque<>(); + + /** */ + private final AtomicBoolean remapOwning = new AtomicBoolean(); + /** * @param ctx Grid kernal context. * @param cacheName Cache name. @@ -301,7 +332,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed Buffer buf = bufMappings.get(nodeId); if (buf != null) - buf.onResponse(res); + buf.onResponse(res, nodeId); else if (log.isDebugEnabled()) log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); @@ -314,6 +345,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed fut = new DataStreamerFuture(this); publicFut = new IgniteCacheFutureImpl<>(fut); + + GridCacheAdapter cache = ctx.cache().internalCache(cacheName); + + if (cache == null) { // Possible, cache is not configured on node. + assert ccfg != null; + + if (ccfg.getCacheMode() == CacheMode.LOCAL) + throw new CacheException("Impossible to load Local cache configured remotely."); + + ctx.grid().getOrCreateCache(ccfg); + } } /** @@ -358,6 +400,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw new IllegalStateException("Data streamer has been closed."); } + else if (cancelled) { + busyLock.leaveBusy(); + + throw new IllegalStateException("Data streamer has been closed."); + } } /** @@ -633,6 +680,37 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * + */ + private void acquireRemapSemaphore() throws IgniteInterruptedCheckedException { + try { + if (remapSem.availablePermits() != REMAP_SEMAPHORE_PERMISSIONS_COUNT) { + if (timeout == DFLT_UNLIMIT_TIMEOUT) { + // Wait until failed data being processed. + remapSem.acquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT); + + remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT); + } + else { + // Wait until failed data being processed. + boolean res = remapSem.tryAcquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT, timeout, TimeUnit.MILLISECONDS); + + if (res) + remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT); + else + throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout " + + "while was waiting for failed data resending finished."); + } + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } + + /** * @param entries Entries. * @param resFut Result future. * @param activeKeys Active keys. @@ -644,170 +722,266 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @Nullable final Collection<KeyCacheObjectWrapper> activeKeys, final int remaps ) { - assert entries != null; + try { + assert entries != null; - if (!isWarningPrinted) { - synchronized (this) { - if (!allowOverwrite() && !isWarningPrinted) { - U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " + - "(to change, set allowOverwrite to true)"); - } + final boolean remap = remaps > 0; - isWarningPrinted = true; + if (!remap) { // Failed data should be processed prior to new data. + acquireRemapSemaphore(); } - } - Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>(); + if (!isWarningPrinted) { + synchronized (this) { + if (!allowOverwrite() && !isWarningPrinted) { + U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " + + "(to change, set allowOverwrite to true)"); + } - boolean initPda = ctx.deploy().enabled() && jobPda == null; + isWarningPrinted = true; + } + } - AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion(); + Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>(); - for (DataStreamerEntry entry : entries) { - List<ClusterNode> nodes; + boolean initPda = ctx.deploy().enabled() && jobPda == null; - try { - KeyCacheObject key = entry.getKey(); + GridCacheAdapter cache = ctx.cache().internalCache(cacheName); - assert key != null; + if (cache == null) + throw new IgniteCheckedException("Cache not created or already destroyed."); - if (initPda) { - if (cacheObjCtx.addDeploymentInfo()) - jobPda = new DataStreamerPda(key.value(cacheObjCtx, false), - entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null, - rcvr); - else if (rcvr != null) - jobPda = new DataStreamerPda(rcvr); + GridCacheContext cctx = cache.context(); - initPda = false; - } + GridCacheGateway gate = null; - nodes = nodes(key, topVer); - } - catch (IgniteCheckedException e) { - resFut.onDone(e); + if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required. + gate = cctx.gate(); - return; + gate.enter(); } - if (F.isEmpty(nodes)) { - resFut.onDone(new ClusterTopologyException("Failed to map key to node " + - "(no nodes with cache found in topology) [infos=" + entries.size() + - ", cacheName=" + cacheName + ']')); - - return; - } + try { + AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ? + ctx.cache().context().exchange().readyAffinityVersion() : + cctx.topology().topologyVersion(); - for (ClusterNode node : nodes) { - Collection<DataStreamerEntry> col = mappings.get(node); + for (DataStreamerEntry entry : entries) { + List<ClusterNode> nodes; - if (col == null) - mappings.put(node, col = new ArrayList<>()); + try { + KeyCacheObject key = entry.getKey(); - col.add(entry); - } - } + assert key != null; - for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) { - final UUID nodeId = e.getKey().id(); + if (initPda) { + if (cacheObjCtx.addDeploymentInfo()) + jobPda = new DataStreamerPda(key.value(cacheObjCtx, false), + entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null, + rcvr); + else if (rcvr != null) + jobPda = new DataStreamerPda(rcvr); - Buffer buf = bufMappings.get(nodeId); + initPda = false; + } - if (buf == null) { - Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); + nodes = nodes(key, topVer, cctx); + } + catch (IgniteCheckedException e) { + resFut.onDone(e); - if (old != null) - buf = old; - } + return; + } - final Collection<DataStreamerEntry> entriesForNode = e.getValue(); + if (F.isEmpty(nodes)) { + resFut.onDone(new ClusterTopologyException("Failed to map key to node " + + "(no nodes with cache found in topology) [infos=" + entries.size() + + ", cacheName=" + cacheName + ']')); - IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - try { - t.get(); + return; + } - if (activeKeys != null) { - for (DataStreamerEntry e : entriesForNode) - activeKeys.remove(new KeyCacheObjectWrapper(e.getKey())); + for (ClusterNode node : nodes) { + Collection<DataStreamerEntry> col = mappings.get(node); - if (activeKeys.isEmpty()) - resFut.onDone(); - } - else { - assert entriesForNode.size() == 1; + if (col == null) + mappings.put(node, col = new ArrayList<>()); - // That has been a single key, - // so complete result future right away. - resFut.onDone(); - } + col.add(entry); } - catch (IgniteClientDisconnectedCheckedException e1) { - if (log.isDebugEnabled()) - log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']'); + } - resFut.onDone(e1); + for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) { + final UUID nodeId = e.getKey().id(); + + Buffer buf = bufMappings.get(nodeId); + + if (buf == null) { + Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); + + if (old != null) + buf = old; } - catch (IgniteCheckedException e1) { - if (log.isDebugEnabled()) - log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); - if (cancelled) { - resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + - DataStreamerImpl.this, e1)); + final Collection<DataStreamerEntry> entriesForNode = e.getValue(); + + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + try { + t.get(); + + if (activeKeys != null) { + for (DataStreamerEntry e : entriesForNode) + activeKeys.remove(new KeyCacheObjectWrapper(e.getKey())); + + if (activeKeys.isEmpty()) + resFut.onDone(); + } + else { + assert entriesForNode.size() == 1; + + // That has been a single key, + // so complete result future right away. + resFut.onDone(); + } + } + catch (IgniteClientDisconnectedCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + resFut.onDone(e1); + } + catch (IgniteCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + if (cancelled) { + resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + + DataStreamerImpl.this, e1)); + } + else if (remaps + 1 > maxRemapCnt) { + resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + + remaps, e1)); + } + else { + try { + remapSem.acquire(); + + final Runnable r = new Runnable() { + @Override public void run() { + try { + if (cancelled) + throw new IllegalStateException("DataStreamer closed."); + + load0(entriesForNode, resFut, activeKeys, remaps + 1); + } + catch (Throwable ex) { + resFut.onDone( + new IgniteCheckedException("DataStreamer remapping failed. ", ex)); + } + finally { + remapSem.release(); + } + } + }; + + dataToRemap.add(r); + + if (!remapOwning.get() && remapOwning.compareAndSet(false, true)) { + ctx.closure().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + boolean locked = true; + + while (locked || !dataToRemap.isEmpty()) { + if (!locked && !remapOwning.compareAndSet(false, true)) + return false; + + try { + Runnable r = dataToRemap.poll(); + + if (r != null) + r.run(); + } + finally { + if (!dataToRemap.isEmpty()) + locked = true; + else { + remapOwning.set(false); + + locked = false; + } + } + } + + return true; + } + }, true); + } + } + catch (InterruptedException e2) { + resFut.onDone(e2); + } + } + } } - else if (remaps + 1 > maxRemapCnt) { - resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " - + remaps), e1); - } - else - load0(entriesForNode, resFut, activeKeys, remaps + 1); - } - } - }; + }; - final GridFutureAdapter<?> f; + final GridFutureAdapter<?> f; - try { - f = buf.update(entriesForNode, topVer, lsnr); - } - catch (IgniteInterruptedCheckedException e1) { - resFut.onDone(e1); + try { + f = buf.update(entriesForNode, topVer, lsnr, remap); + } + catch (IgniteInterruptedCheckedException e1) { + resFut.onDone(e1); - return; - } + return; + } - if (ctx.discovery().node(nodeId) == null) { - if (bufMappings.remove(nodeId, buf)) { - final Buffer buf0 = buf; + if (ctx.discovery().node(nodeId) == null) { + if (bufMappings.remove(nodeId, buf)) { + final Buffer buf0 = buf; - waitAffinityAndRun(new Runnable() { - @Override public void run() { - buf0.onNodeLeft(); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf0.onNodeLeft(); - if (f != null) - f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + - "(node has left): " + nodeId)); + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + }, ctx.discovery().topologyVersion(), false); } - }, ctx.discovery().topologyVersion(), false); + } } } + finally { + if (gate != null) + gate.leave(); + } + } + catch (Exception ex) { + resFut.onDone(new IgniteCheckedException("DataStreamer data loading failed.", ex)); } } /** * @param key Key to map. * @param topVer Topology version. + * @param cctx Context. * @return Nodes to send requests to. * @throws IgniteCheckedException If failed. */ - private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException { + private List<ClusterNode> nodes(KeyCacheObject key, + AffinityTopologyVersion topVer, + GridCacheContext cctx) throws IgniteCheckedException { GridAffinityProcessor aff = ctx.affinity(); List<ClusterNode> res = null; if (!allowOverwrite()) - res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer); + res = cctx.isLocal() ? + aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer) : + cctx.topology().nodes(cctx.affinity().partition(key), topVer); else { ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer); @@ -992,7 +1166,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @throws IgniteCheckedException If failed. */ public void closeEx(boolean cancel) throws IgniteCheckedException { - closeEx(cancel, null); + IgniteCheckedException err = closeEx(cancel, null); + + if (err != null) + throw err; // Throws at close(). } /** @@ -1000,9 +1177,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @param err Error. * @throws IgniteCheckedException If failed. */ - public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException { + private IgniteCheckedException closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException { if (!closed.compareAndSet(false, true)) - return; + return null; busyLock.block(); @@ -1029,7 +1206,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw e; } + long failed = failCntr.longValue(); + + if (failed > 0 && err == null) + err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]"); + fut.onDone(err); + + return err; } /** @@ -1139,6 +1323,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private final Semaphore sem; + /** Batch topology. */ + private AffinityTopologyVersion batchTopVer; + /** Closure to signal on task finish. */ @GridToStringExclude private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() { @@ -1169,37 +1356,64 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param remap Remapping flag. + */ + private void renewBatch(boolean remap) { + entries = newEntries(); + curFut = new GridFutureAdapter<>(); + + batchTopVer = null; + + if (!remap) + curFut.listen(signalC); + } + + /** * @param newEntries Infos. * @param topVer Topology version. * @param lsnr Listener for the operation future. + * @param remap Remapping flag. * @return Future for operation. * @throws IgniteInterruptedCheckedException If failed. */ @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, AffinityTopologyVersion topVer, - IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException { + IgniteInClosure<IgniteInternalFuture<?>> lsnr, + boolean remap) throws IgniteInterruptedCheckedException { List<DataStreamerEntry> entries0 = null; + GridFutureAdapter<Object> curFut0; + AffinityTopologyVersion curBatchTopVer; + synchronized (this) { curFut0 = curFut; curFut0.listen(lsnr); + if (batchTopVer == null) + batchTopVer = topVer; + + curBatchTopVer = batchTopVer; + for (DataStreamerEntry entry : newEntries) entries.add(entry); if (entries.size() >= bufSize) { entries0 = entries; - entries = newEntries(); - curFut = new GridFutureAdapter<>(); - curFut.listen(signalC); + renewBatch(remap); } } - if (entries0 != null) { - submit(entries0, topVer, curFut0); + if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { + renewBatch(remap); + + curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." + + "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]")); + } + else if (entries0 != null) { + submit(entries0, curBatchTopVer, curFut0, remap); if (cancelled) curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + @@ -1227,6 +1441,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed List<DataStreamerEntry> entries0 = null; GridFutureAdapter<Object> curFut0 = null; + acquireRemapSemaphore(); + synchronized (this) { if (!entries.isEmpty()) { entries0 = entries; @@ -1239,7 +1455,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (entries0 != null) - submit(entries0, null, curFut0); + submit(entries0, batchTopVer, curFut0, false); // Create compound future for this flush. GridCompoundFuture<Object, Object> res = null; @@ -1290,25 +1506,113 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param entries Entries. + * @param reqTopVer Request topology version. + * @param curFut Current future. + */ + private void localUpdate(final Collection<DataStreamerEntry> entries, + final AffinityTopologyVersion reqTopVer, + final GridFutureAdapter<Object> curFut) { + try { + GridCacheContext cctx = ctx.cache().internalCache(cacheName).context(); + + final boolean allowOverwrite = allowOverwrite(); + final boolean loc = cctx.isLocal(); + + if (!loc && !allowOverwrite) + cctx.topology().readLock(); + + try { + GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture(); + + AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion(); + + if (!allowOverwrite && !topVer.equals(reqTopVer)) { + curFut.onDone(new IgniteCheckedException( + "DataStreamer will retry data transfer at stable topology. " + + "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]")); + } + else if (loc || allowOverwrite || fut.isDone()) { + IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe( + new DataStreamerUpdateJob( + ctx, + log, + cacheName, + entries, + false, + skipStore, + keepBinary, + rcvr), + false); + + locFuts.add(callFut); + + final GridFutureAdapter waitFut = (loc || allowOverwrite) ? + null : + cctx.mvcc().addDataStreamerFuture(topVer); + + callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + try { + boolean rmv = locFuts.remove(t); + + assert rmv; + + curFut.onDone(t.get()); + } + catch (IgniteCheckedException e) { + curFut.onDone(e); + } + finally { + if (waitFut != null) + waitFut.onDone(); + } + } + }); + } + else { + fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { + localUpdate(entries, reqTopVer, curFut); + } + }); + } + } + finally { + if (!loc && !allowOverwrite) + cctx.topology().readUnlock(); + } + } + catch (Throwable ex) { + curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex)); + } + } + + /** * @param entries Entries to submit. * @param topVer Topology version. * @param curFut Current future. + * @param remap Remapping flag. * @throws IgniteInterruptedCheckedException If interrupted. */ private void submit(final Collection<DataStreamerEntry> entries, @Nullable AffinityTopologyVersion topVer, - final GridFutureAdapter<Object> curFut) + final GridFutureAdapter<Object> curFut, + boolean remap) throws IgniteInterruptedCheckedException { assert entries != null; assert !entries.isEmpty(); assert curFut != null; - try { - incrementActiveTasks(); - } - catch (IgniteDataStreamerTimeoutException e) { - curFut.onDone(e); - throw e; + if (!remap) { + try { + incrementActiveTasks(); + } + catch (IgniteDataStreamerTimeoutException e) { + curFut.onDone(e); + + throw e; + } } IgniteInternalFuture<Object> fut; @@ -1318,27 +1622,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (plc == null) plc = PUBLIC_POOL; - if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) { - fut = ctx.closure().callLocalSafe( - new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false); - - locFuts.add(fut); - - fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { - try { - boolean rmv = locFuts.remove(t); - - assert rmv; - - curFut.onDone(t.get()); - } - catch (IgniteCheckedException e) { - curFut.onDone(e); - } - } - }); - } + if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) + localUpdate(entries, topVer, curFut); else { try { for (DataStreamerEntry e : entries) { @@ -1466,8 +1751,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param res Response. + * @param nodeId Node id. */ - void onResponse(DataStreamerResponse res) { + void onResponse(DataStreamerResponse res, UUID nodeId) { if (log.isDebugEnabled()) log.debug("Received data load response: " + res); @@ -1488,9 +1774,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed try { GridPeerDeployAware jobPda0 = jobPda; - err = U.unmarshal(ctx, - errBytes, - U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); + err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]", + (Throwable)U.unmarshal(ctx, + errBytes, + U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()))); } catch (IgniteCheckedException e) { f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); @@ -1613,7 +1900,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * Isolated receiver which only loads entry initial value. */ - private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, + protected static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, DataStreamerCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; @@ -1630,7 +1917,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed GridCacheContext cctx = internalCache.context(); - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = cctx.isLocal() ? + cctx.affinity().affinityTopologyVersion() : + cctx.topology().topologyVersion(); GridCacheVersion ver = cctx.versions().isolatedStreamerVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java index 745619a..ce6783a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java @@ -72,7 +72,21 @@ public class GridLogThrottle { public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) { assert !F.isEmpty(msg); - log(log, e, msg, null, LogLevel.ERROR, false); + log(log, e, msg, null, LogLevel.ERROR, false, false); + } + + /** + * Logs error if needed. + * + * @param log Logger. + * @param e Error (optional). + * @param msg Message. + * @param byMessage Errors group by message, not by tuple(error, msg). + */ + public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMessage) { + assert !F.isEmpty(msg); + + log(log, e, msg, null, LogLevel.ERROR, false, byMessage); } /** @@ -85,7 +99,7 @@ public class GridLogThrottle { public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) { assert !F.isEmpty(msg); - log(log, e, msg, null, LogLevel.WARN, false); + log(log, e, msg, null, LogLevel.WARN, false, false); } /** @@ -99,7 +113,7 @@ public class GridLogThrottle { public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite) { assert !F.isEmpty(msg); - log(log, e, msg, null, LogLevel.WARN, quite); + log(log, e, msg, null, LogLevel.WARN, quite, false); } /** @@ -113,7 +127,7 @@ public class GridLogThrottle { public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg) { assert !F.isEmpty(longMsg); - log(log, e, longMsg, shortMsg, LogLevel.WARN, false); + log(log, e, longMsg, shortMsg, LogLevel.WARN, false, false); } /** @@ -126,7 +140,7 @@ public class GridLogThrottle { public static void info(@Nullable IgniteLogger log, String msg, boolean quite) { assert !F.isEmpty(msg); - log(log, null, msg, null, LogLevel.INFO, quite); + log(log, null, msg, null, LogLevel.INFO, quite, false); } /** @@ -154,14 +168,15 @@ public class GridLogThrottle { * @param longMsg Long message (or just message). * @param shortMsg Short message for quite logging. * @param level Level where messages should appear. + * @param byMessage Errors group by message, not by tuple(error, msg). */ @SuppressWarnings({"RedundantTypeArguments"}) private static void log(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg, - LogLevel level, boolean quiet) { + LogLevel level, boolean quiet, boolean byMessage) { assert !F.isEmpty(longMsg); IgniteBiTuple<Class<? extends Throwable>, String> tup = - e != null ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) : + e != null && !byMessage ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) : F.<Class<? extends Throwable>, String>t(null, longMsg); while (true) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java index 5bd6074..c92ea9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java @@ -78,27 +78,37 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest { IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { /** {@inheritDoc} */ @Override public Object call() throws Exception { - try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) { - str.allowOverwrite(allowOverwrite); - - int i = 0; - - while (!stop.get()) { - str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { - try { - f.get(); - } - catch (CacheException ignore) { - // This may be debugged. - } + while (!stop.get()) { + try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) { + str.allowOverwrite(allowOverwrite); + + int i = 0; + + while (!stop.get()) { + try { + str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> f) { + try { + f.get(); + } + catch (CacheException ignore) { + // This may be debugged. + } + } + }); + } + catch (IllegalStateException ignored) { + break; } - }); - if (i > 0 && i % 10000 == 0) - info("Added: " + i); + if (i > 0 && i % 10000 == 0) + info("Added: " + i); - i++; + i++; + } + } + catch (IllegalStateException | CacheException ignored) { + // This may be debugged. } } @@ -114,6 +124,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest { Thread.sleep(500); ignite(0).createCache(ccfg); + + Thread.sleep(1000); } finally { stop.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java index 9da6cf7..0801691 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Serializable; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.Callable; import javax.cache.Cache; import javax.cache.configuration.FactoryBuilder; @@ -28,32 +31,47 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** * Tests for cache data loading during simultaneous grids start. */ -public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-500"); - } - +public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest implements Serializable { /** Grids count */ private static int GRIDS_CNT = 5; /** Keys count */ private static int KEYS_CNT = 1_000_000; + /** Client. */ + private volatile boolean client; + + /** Config. */ + private volatile boolean configured; + + /** Allow override. */ + protected volatile boolean allowOverwrite; + + /** Restarts. */ + protected volatile boolean restarts; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -67,7 +85,24 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestCacheStoreAdapter())); - cfg.setCacheConfiguration(ccfg); + if (getTestGridName(0).equals(gridName)) { + if (client) + cfg.setClientMode(true); + + if (configured) + cfg.setCacheConfiguration(ccfg); + } + else + cfg.setCacheConfiguration(ccfg); + + if (!configured) + ccfg.setNodeFilter(new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + String name = node.attribute(ATTR_GRID_NAME).toString(); + + return !getTestGridName(0).equals(name); + } + }); return cfg; } @@ -81,22 +116,35 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT * @throws Exception if failed */ public void testLoadCacheWithDataStreamer() throws Exception { - IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() { - @Override public void apply(Ignite grid) { - try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) { - for (int i = 0; i < KEYS_CNT; i++) - dataStreamer.addData(i, Integer.toString(i)); + configured = true; + + try { + IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() { + @Override public void apply(Ignite grid) { + try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) { + dataStreamer.allowOverwrite(allowOverwrite); + + for (int i = 0; i < KEYS_CNT; i++) + dataStreamer.addData(i, Integer.toString(i)); + } + + log.info("Data loaded."); } - } - }; + }; - loadCache(f); + loadCache(f); + } + finally { + configured = false; + } } /** * @throws Exception if failed */ public void testLoadCacheFromStore() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-4210"); + loadCache(new IgniteInClosure<Ignite>() { @Override public void apply(Ignite grid) { grid.cache(null).loadCache(null); @@ -105,12 +153,177 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT } /** + * @throws Exception if failed + */ + public void testLoadCacheWithDataStreamerSequentialClient() throws Exception { + client = true; + + try { + loadCacheWithDataStreamerSequential(); + } + finally { + client = false; + } + } + + /** + * @throws Exception if failed + */ + public void testLoadCacheWithDataStreamerSequentialClientWithConfig() throws Exception { + client = true; + configured = true; + + try { + loadCacheWithDataStreamerSequential(); + } + finally { + client = false; + configured = false; + } + } + + /** + * @throws Exception if failed + */ + public void testLoadCacheWithDataStreamerSequential() throws Exception { + loadCacheWithDataStreamerSequential(); + } + + /** + * @throws Exception if failed + */ + public void testLoadCacheWithDataStreamerSequentialWithConfigAndRestarts() throws Exception { + restarts = true; + configured = true; + + try { + loadCacheWithDataStreamerSequential(); + } + finally { + restarts = false; + configured = false; + } + } + + /** + * @throws Exception if failed + */ + public void testLoadCacheWithDataStreamerSequentialWithConfig() throws Exception { + configured = true; + + try { + loadCacheWithDataStreamerSequential(); + } + finally { + configured = false; + } + } + + /** + * @throws Exception if failed + */ + private void loadCacheWithDataStreamerSequential() throws Exception { + startGrid(1); + + Ignite g0 = startGrid(0); + + IgniteInternalFuture<Object> restartFut = runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (restarts) { + stopGrid(1); + + startGrid(1); + + U.sleep(100); + } + + return null; + } + }); + + IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 2; i < GRIDS_CNT; i++) + startGrid(i); + + return null; + } + }); + + final HashSet<IgniteFuture> set = new HashSet<>(); + + IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() { + @Override public void apply(Ignite grid) { + try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) { + dataStreamer.allowOverwrite(allowOverwrite); + + for (int i = 0; i < KEYS_CNT; i++) { + set.add(dataStreamer.addData(i, "Data")); + + if (i % 100000 == 0) + log.info("Streaming " + i + "'th entry."); + } + } + } + }; + + f.apply(g0); + + log.info("Data loaded."); + + restarts = false; + + fut.get(); + restartFut.get(); + + for (IgniteFuture res : set) + assertNull(res.get()); + + IgniteCache<Integer, String> cache = grid(0).cache(null); + + long size = cache.size(CachePeekMode.PRIMARY); + + if (size != KEYS_CNT) { + Set<Integer> failedKeys = new LinkedHashSet<>(); + + for (int i = 0; i < KEYS_CNT; i++) + if (!cache.containsKey(i)) { + log.info("Actual cache size: " + size); + + for (Ignite ignite : G.allGrids()) { + IgniteEx igniteEx = (IgniteEx)ignite; + + log.info("Missed key info:" + + igniteEx.localNode().id() + + " primary=" + + ignite.affinity(null).isPrimary(igniteEx.localNode(), i) + + " backup=" + + ignite.affinity(null).isBackup(igniteEx.localNode(), i) + + " local peek=" + + ignite.cache(null).localPeek(i, CachePeekMode.ONHEAP)); + } + + for (int j = i; j < i + 10000; j++) { + if (!cache.containsKey(j)) + failedKeys.add(j); + } + + break; + } + + assert failedKeys.isEmpty() : "Some failed keys: " + failedKeys.toString(); + } + + assertCacheSize(); + } + + /** * Loads cache using closure and asserts cache size. * * @param f cache loading closure * @throws Exception if failed */ - private void loadCache(IgniteInClosure<Ignite> f) throws Exception { + protected void loadCache(IgniteInClosure<Ignite> f) throws Exception { Ignite g0 = startGrid(0); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Ignite>() { @@ -130,17 +343,17 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT } /** Asserts cache size. */ - private void assertCacheSize() { + protected void assertCacheSize() { IgniteCache<Integer, String> cache = grid(0).cache(null); - assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY)); + assertEquals("Data lost.", KEYS_CNT, cache.size(CachePeekMode.PRIMARY)); int total = 0; for (int i = 0; i < GRIDS_CNT; i++) total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY); - assertEquals(KEYS_CNT, total); + assertEquals("Data lost.", KEYS_CNT, total); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java new file mode 100644 index 0000000..c9cd9fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +/** + * + */ +public class CacheLoadingConcurrentGridStartSelfTestAllowOverwrite extends CacheLoadingConcurrentGridStartSelfTest { + /** + * Default constructor. + */ + public CacheLoadingConcurrentGridStartSelfTestAllowOverwrite() { + allowOverwrite = true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index 9fedc35..0f8ae29 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -194,7 +194,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { assert false; } - catch (IgniteCheckedException e) { + catch (CacheException e) { // Cannot load local cache configured remotely. info("Caught expected exception: " + e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index 0c6686f..a6a9f54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -22,13 +22,17 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheServerNotFoundException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -50,6 +54,16 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { /** Started grid counter. */ private static int cnt; + /** No nodes filter. */ + private static volatile boolean noNodesFilter; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -72,88 +86,149 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { - try { - startGrids(5); + startGrids(5); - final CyclicBarrier barrier = new CyclicBarrier(2); + final CyclicBarrier barrier = new CyclicBarrier(2); - multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - U.awaitQuiet(barrier); + multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.awaitQuiet(barrier); - G.stopAll(true); + G.stopAll(true); - return null; - } - }, 1); + return null; + } + }, 1); - Ignite g4 = grid(4); + Ignite g4 = grid(4); - IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); + IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); - dataLdr.perNodeBufferSize(32); + dataLdr.perNodeBufferSize(32); - for (int i = 0; i < 100000; i += 2) { - dataLdr.addData(i, i); - dataLdr.removeData(i + 1); - } + for (int i = 0; i < 100000; i += 2) { + dataLdr.addData(i, i); + dataLdr.removeData(i + 1); + } - U.awaitQuiet(barrier); + U.awaitQuiet(barrier); - info("Closing data streamer."); + info("Closing data streamer."); - try { - dataLdr.close(true); - } - catch (IllegalStateException ignore) { - // This is ok to ignore this exception as test is racy by it's nature - - // grid is stopping in different thread. - } + try { + dataLdr.close(true); } - finally { - G.stopAll(true); + catch (CacheException | IllegalStateException ignore) { + // This is ok to ignore this exception as test is racy by it's nature - + // grid is stopping in different thread. } } /** * Data streamer should correctly load entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. + * and with GridOptimizedMarshaller that requires serializable. * * @throws Exception If failed. */ public void testAddDataFromMap() throws Exception { - try { - cnt = 0; + cnt = 0; - startGrids(2); + startGrids(2); - Ignite g0 = grid(0); + Ignite g0 = grid(0); - IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); + IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); - Map<Integer, String> map = U.newHashMap(KEYS_COUNT); + Map<Integer, String> map = U.newHashMap(KEYS_COUNT); - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, String.valueOf(i)); + for (int i = 0; i < KEYS_COUNT; i++) + map.put(i, String.valueOf(i)); - dataLdr.addData(map); + dataLdr.addData(map); - dataLdr.close(); + dataLdr.close(); - Random rnd = new Random(); + Random rnd = new Random(); - IgniteCache<Integer, String> c = g0.cache(null); + IgniteCache<Integer, String> c = g0.cache(null); - for (int i = 0; i < KEYS_COUNT; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); + for (int i = 0; i < KEYS_COUNT; i++) { + Integer k = rnd.nextInt(KEYS_COUNT); - String v = c.get(k); + String v = c.get(k); + + assertEquals(k.toString(), v); + } + } + + /** + * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes + * + * @throws Exception If fail. + */ + public void testNoDataNodesOnClose() throws Exception { + boolean failed = false; + + cnt = 0; + + noNodesFilter = true; + + try { + Ignite ignite = startGrid(1); - assertEquals(k.toString(), v); + try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) { + streamer.addData(1, "1"); + } + catch (CacheException ex) { + failed = true; } } finally { - G.stopAll(true); + noNodesFilter = false; + + assertTrue(failed); + } + } + + /** + * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes + * + * @throws Exception If fail. + */ + public void testNoDataNodesOnFlush() throws Exception { + boolean failed = false; + + cnt = 0; + + noNodesFilter = true; + + try { + Ignite ignite = startGrid(1); + + IgniteFuture fut = null; + + try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) { + fut = streamer.addData(1, "1"); + + streamer.flush(); + } + catch (IllegalStateException ex) { + try { + fut.get(); + + fail("DataStreamer ignores failed streaming."); + } + catch (CacheServerNotFoundException ignored) { + // No-op. + } + + failed = true; + } + } + finally { + noNodesFilter = false; + + assertTrue(failed); } } @@ -169,6 +244,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { cacheCfg.setBackups(1); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + if (noNodesFilter) + cacheCfg.setNodeFilter(F.alwaysFalse()); + return cacheCfg; }
