IGNITE-7533 Throttle writing threads according to fsync progress - Fixes #3437.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b50aa5eb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b50aa5eb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b50aa5eb Branch: refs/heads/master Commit: b50aa5eb0b3d8a641b574401b8d79e3ca612756a Parents: e8bd98d Author: dpavlov <[email protected]> Authored: Wed Feb 7 10:37:51 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Feb 7 10:37:51 2018 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 86 ++- .../pagemem/IntervalBasedMeasurement.java | 304 +++++++++++ .../persistence/pagemem/PageMemoryImpl.java | 108 +++- .../pagemem/PagesWriteSpeedBasedThrottle.java | 519 +++++++++++++++++++ .../persistence/pagemem/PagesWriteThrottle.java | 21 +- .../pagemem/PagesWriteThrottlePolicy.java | 39 ++ ...gniteCheckpointDirtyPagesForLowLoadTest.java | 21 +- .../checkpoint/IgniteMassLoadSandboxTest.java | 515 ++++++++++++++++++ .../db/checkpoint/ProgressWatchdog.java | 495 ++++++++++++++++++ .../pagemem/BPlusTreePageMemoryImplTest.java | 2 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 2 +- .../pagemem/IgniteThrottlingUnitTest.java | 270 ++++++++++ .../pagemem/IndexStoragePageMemoryImplTest.java | 2 +- .../pagemem/PageMemoryImplNoLoadTest.java | 2 +- .../persistence/pagemem/PageMemoryImplTest.java | 2 +- .../ignite/testsuites/IgnitePdsTestSuite.java | 1 - .../testsuites/IgnitePdsUnitTestSuite.java | 31 ++ .../testsuites/IgniteReproducingSuite.java | 6 +- 18 files changed, 2362 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5dc81c5..bd80ec8 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -55,6 +55,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -129,7 +130,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.port.GridPortRecord; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; @@ -156,6 +156,7 @@ import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedHashMap; import static java.nio.file.StandardOpenOption.READ; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; @@ -355,7 +356,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Counter for written checkpoint pages. Not null only if checkpoint is running. */ private volatile AtomicInteger writtenPagesCntr = null; - /** Number of pages in current checkpoint. */ + /** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */ + private volatile AtomicInteger syncedPagesCntr = null; + + /** Counter for evictted checkpoint pages. Not null only if checkpoint is running. */ + private volatile AtomicInteger evictedPagesCntr = null; + + /** Number of pages in current checkpoint at the beginning of checkpoint. */ private volatile int currCheckpointPagesCnt; /** */ @@ -933,10 +940,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chpBufSize = cacheSize; } - boolean writeThrottlingEnabled = persistenceCfg.isWriteThrottlingEnabled(); + PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled() + ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED + : PageMemoryImpl.ThrottlingPolicy.NONE; + + String val = IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED); - if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, false)) - writeThrottlingEnabled = true; + if (val != null) { + if ("ratio".equalsIgnoreCase(val)) + plc = PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED; + else if ("speed".equalsIgnoreCase(val) || Boolean.valueOf(val)) + plc = PageMemoryImpl.ThrottlingPolicy.SPEED_BASED; + } GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker; @@ -974,12 +989,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // Only after write we can write page into snapshot. snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag); + + AtomicInteger cntr = evictedPagesCntr; + + if (cntr != null) + cntr.incrementAndGet(); } }, changeTracker, this, memMetrics, - writeThrottlingEnabled + plc ); memMetrics.pageMemory(pageMem); @@ -2610,6 +2630,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * @return Counter for fsynced checkpoint pages. Not null only if checkpoint is running. + */ + public AtomicInteger syncedPagesCounter() { + return syncedPagesCntr; + } + + /** + * @return Counter for evicted pages during current checkpoint. Not null only if checkpoint is running. + */ + public AtomicInteger evictedPagesCntr() { + return evictedPagesCntr; + } + + /** * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. */ public int currentCheckpointPagesCount() { @@ -2799,13 +2833,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan currCheckpointPagesCnt = chp.pagesSize; writtenPagesCntr = new AtomicInteger(); + syncedPagesCntr = new AtomicInteger(); + evictedPagesCntr = new AtomicInteger(); boolean interrupted = true; try { if (chp.hasDelta()) { // Identity stores set. - GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<>(); + ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>(); CountDownFuture doneWriteFut = new CountDownFuture( asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); @@ -2867,14 +2903,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tracker.onFsyncStart(); if (!skipSync) { - for (PageStore updStore : updStores) { + for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) { if (shutdownNow) { chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); return; } - updStore.sync(); + updStoreEntry.getKey().sync(); + + syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue()); } } } @@ -3035,14 +3073,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (grp.isLocal() || !grp.walEnabled()) continue; - List<GridDhtLocalPartition> locParts = new ArrayList<>(); + int locPartsSize = 0; - for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) - locParts.add(part); + for (GridDhtLocalPartition ignored : grp.topology().currentLocalPartitions()) + locPartsSize++; - Collections.sort(locParts, ASC_PART_COMPARATOR); - - CacheState state = new CacheState(locParts.size()); + CacheState state = new CacheState(locPartsSize); for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) { state.addPartitionState( @@ -3184,6 +3220,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan res.add(nextCpPagesCol); } + currCheckpointPagesCnt = pagesNum; + return new IgniteBiTuple<>(res, pagesNum); } @@ -3192,6 +3230,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException { synchronized (this) { + writtenPagesCntr = null; + syncedPagesCntr = null; + evictedPagesCntr = null; + for (DataRegion memPlc : dataRegions()) { if (!memPlc.config().isPersistenceEnabled()) continue; @@ -3208,8 +3250,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan null, CheckpointEntryType.END); - writtenPagesCntr = null; - currCheckpointPagesCnt = 0; } @@ -3261,7 +3301,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) { - Collections.sort(cpPagesList, new Comparator<FullPageId>() { + FullPageId[] objects = cpPagesList.toArray(new FullPageId[cpPagesList.size()]); + + Arrays.parallelSort(objects, new Comparator<FullPageId>() { @Override public int compare(FullPageId o1, FullPageId o2) { int cmp = Long.compare(o1.groupId(), o2.groupId()); if (cmp != 0) @@ -3271,6 +3313,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan PageIdUtils.effectivePageId(o2.pageId())); } }); + + cpPagesList = Arrays.asList(objects); } int cpThreads = persistenceCfg.getCheckpointThreads(); @@ -3302,7 +3346,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private Collection<FullPageId> writePageIds; /** */ - private GridConcurrentHashSet<PageStore> updStores; + private ConcurrentLinkedHashMap<PageStore, LongAdder> updStores; /** */ private CountDownFuture doneFut; @@ -3322,7 +3366,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private WriteCheckpointPages( final CheckpointMetricsTracker tracker, final Collection<FullPageId> writePageIds, - final GridConcurrentHashSet<PageStore> updStores, + final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores, final CountDownFuture doneFut, final int totalPagesToWrite) { this.tracker = tracker; @@ -3398,7 +3442,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false); - updStores.add(store); + updStores.computeIfAbsent(store, k -> new LongAdder()).increment(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java new file mode 100644 index 0000000..db4da6b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Speed tracker for determine speed of processing based on increments or exact counters value. <br> + * Measurement is performed using several intervals (1 current + 3 historical by default). <br> + * Too old measurements (intervals) may be dropped if automatic switch mode activated.<br> + * To determine speed current measurement is reduced with all historical.<br> + * <br> + * For mode of manual measurements switch it is possible to use + * <br> default ctor + * {@link #IntervalBasedMeasurement()} and methods <br> + * {@link #setCounter(long, long)} (automatically opens interval if not opened) and <br> + * {@link #finishInterval()} to close measurement.<br> + * <br> + * For mode of automatic measurements switch it is possible to use + * <br> parametrized ctor + * {@link #IntervalBasedMeasurement(int, int)} and methods <br> + * {@link #setCounter(long, long)} (automatically opens interval if not opened) or + * {@link #addMeasurementForAverageCalculation(long)} to provide metrics value in addition to event.<br> + * {@link #finishInterval()} is also supported, but not required<br> + * <br> + * + * To get results of speed calculation it is possible to use <br> + * Method {@link #getSpeedOpsPerSec(long)} to get current speed (and swicth/open interval if needed). <br> + * or method {@link #getSpeedOpsPerSecReadOnly()} to get current speed without interval modification.<br> + * + * If metric value was provided using {@link #addMeasurementForAverageCalculation(long)} + * then method {@link #getAverage()} can be used to get resulting metrics average value during period of time. + */ +class IntervalBasedMeasurement { + /** Nanos in second. */ + private static final long NANOS_IN_SECOND = TimeUnit.SECONDS.toNanos(1); + + /** Current Measurement interval atomic reference. */ + private AtomicReference<MeasurementInterval> measurementIntervalAtomicRef = new AtomicReference<>(); + + /** Interval automatic switch nanoseconds. Negative value means no automatic switch. */ + private final long intervalSwitchNanos; + + /** Max historical measurements to keep. */ + private final int maxMeasurements; + + /** + * Previous (historical) measurements. One thread can write (winner in CAS of {@link + * #measurementIntervalAtomicRef}), all other threads may read. + */ + private final ConcurrentLinkedQueue<MeasurementInterval> prevMeasurements = new ConcurrentLinkedQueue<>(); + + /** + * Default constructor. No automatic switch, 3 historical measurements. + */ + IntervalBasedMeasurement() { + this(-1, 3); + } + + /** + * @param intervalSwitchMs Interval switch milliseconds. + * @param maxMeasurements Max historical measurements to keep. + */ + IntervalBasedMeasurement(int intervalSwitchMs, int maxMeasurements) { + this.intervalSwitchNanos = intervalSwitchMs > 0 ? intervalSwitchMs * TimeUnit.MILLISECONDS.toNanos(1) : -1; + this.maxMeasurements = maxMeasurements; + } + + /** + * Gets speed, start interval (if not started). + * + * @param curNanoTime current time nanos. + * @return speed in pages per second based on current data. + */ + long getSpeedOpsPerSec(long curNanoTime) { + return calcSpeed(interval(curNanoTime), curNanoTime); + } + + /** + * Gets current speed, does not start measurement. + * + * @return speed in pages per second based on current data. + */ + long getSpeedOpsPerSecReadOnly() { + MeasurementInterval interval = measurementIntervalAtomicRef.get(); + + long curNanoTime = System.nanoTime(); + + return calcSpeed(interval, curNanoTime); + } + + /** + * Reduce measurements to calculate average speed. + * + * @param interval current measurement. + * @param curNanoTime current time in nanoseconds. + * @return speed in operations per second from historical only measurements. + */ + private long calcSpeed(@Nullable MeasurementInterval interval, long curNanoTime) { + long nanosPassed = 0; + long opsDone = 0; + + if (!isOutdated(interval, curNanoTime)) { + nanosPassed += curNanoTime - interval.startNanoTime; + opsDone += interval.cntr.get(); + } + + for (MeasurementInterval prevMeasurement : prevMeasurements) { + if (!isOutdated(prevMeasurement, curNanoTime)) { + nanosPassed += prevMeasurement.endNanoTime - prevMeasurement.startNanoTime; + opsDone += prevMeasurement.cntr.get(); + } + } + + return nanosPassed <= 0 ? 0 : opsDone * NANOS_IN_SECOND / nanosPassed; + } + + + + /** + * @param interval Measurement to check. {@code null} is always outdated. + * @param curNanoTime Current time in nanoseconds. + * @return {@code True} if measurement is outdated. + */ + private boolean isOutdated(@Nullable final MeasurementInterval interval, long curNanoTime) { + if (interval == null) + return true; + + long elapsedNs = curNanoTime - interval.startNanoTime; + + if (elapsedNs <= 0) + return true; // interval is started only now + + return (intervalSwitchNanos > 0) + && elapsedNs > (maxMeasurements + 1) * intervalSwitchNanos; + } + + /** + * Gets or creates measurement interval, performs switch to new measurement by timeout. + * @param curNanoTime current nano time. + * @return interval to use. + */ + @NotNull private MeasurementInterval interval(long curNanoTime) { + MeasurementInterval interval; + + do { + interval = measurementIntervalAtomicRef.get(); + if (interval == null) { + MeasurementInterval newInterval = new MeasurementInterval(curNanoTime); + + if (measurementIntervalAtomicRef.compareAndSet(null, newInterval)) + interval = newInterval; + else + continue; + } + + if (intervalSwitchNanos > 0 && (curNanoTime - interval.startNanoTime) > intervalSwitchNanos) { + MeasurementInterval newInterval = new MeasurementInterval(curNanoTime); + + if (measurementIntervalAtomicRef.compareAndSet(interval, newInterval)) { + interval.endNanoTime = curNanoTime; + + pushToHistory(interval); + } + } + } + while (interval == null); + + return interval; + } + + /** + * @param interval finished interval to push to history. + */ + private void pushToHistory(MeasurementInterval interval) { + prevMeasurements.offer(interval); + + if (prevMeasurements.size() > maxMeasurements) + prevMeasurements.remove(); + } + + /** + * Set exact value for counter in current measurement interval, useful only for manually managed measurements. + * + * @param val new value to set. + * @param curNanoTime current nano time. + */ + void setCounter(long val, long curNanoTime) { + interval(curNanoTime).cntr.set(val); + } + + /** + * Manually switch interval to empty (not started measurement). + */ + void finishInterval() { + while (true) { + MeasurementInterval interval = measurementIntervalAtomicRef.get(); + + if (interval == null) + return; + + if (measurementIntervalAtomicRef.compareAndSet(interval, null)) { + interval.endNanoTime = System.nanoTime(); + + pushToHistory(interval); + + return; + } + } + } + + /** + * Gets average metric value previously reported by {@link #addMeasurementForAverageCalculation(long)}. + * This method may start new interval measurement or switch current. + * + * @return average metric value. + */ + public long getAverage() { + long time = System.nanoTime(); + + return avgMeasurementWithHistorical(interval(time), time); + } + + /** + * Reduce measurements to calculate average value. + * + * @param interval current measurement. If null only historical is used. + * @param curNanoTime current time nanoseconds + * @return speed in page per second. + */ + private long avgMeasurementWithHistorical(@Nullable MeasurementInterval interval, long curNanoTime) { + long cnt = 0; + long sum = 0; + if (!isOutdated(interval, curNanoTime)) { + cnt += interval.cntr.get(); + sum += interval.sum.get(); + } + for (MeasurementInterval prevMeasurement : prevMeasurements) { + if (!isOutdated(prevMeasurement, curNanoTime)) { + cnt += prevMeasurement.cntr.get(); + sum += prevMeasurement.sum.get(); + } + } + + return cnt <= 0 ? 0 : sum / cnt; + } + + /** + * Adds measurement to be used for average calculation. Calling this method will later calculate speed of + * measurements come. Result can be taken from {@link #getAverage()}. + * + * @param val value measured now, to be used for average calculation. + */ + void addMeasurementForAverageCalculation(long val) { + MeasurementInterval interval = interval(System.nanoTime()); + + interval.cntr.incrementAndGet(); + interval.sum.addAndGet(val); + } + + /** + * Measurement interval, completed or open. + */ + private static class MeasurementInterval { + /** Counter of performed operations, pages. */ + private AtomicLong cntr = new AtomicLong(); + + /** Sum of measured value, used only for average calculation. */ + private AtomicLong sum = new AtomicLong(); + + /** Timestamp in nanoseconds of measurement start. */ + private final long startNanoTime; + + /** Timestamp in nanoseconds of measurement end. 0 for open (running) measurements.*/ + private volatile long endNanoTime; + + /** + * @param startNanoTime Timestamp of measurement start. + */ + MeasurementInterval(long startNanoTime) { + this.startNanoTime = startNanoTime; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 496a7b1..e4c369d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -226,10 +226,10 @@ public class PageMemoryImpl implements PageMemoryEx { @Nullable private final GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker; /** Pages write throttle. */ - private PagesWriteThrottle writeThrottle; + private PagesWriteThrottlePolicy writeThrottle; - /** Write throttle enabled flag. */ - private boolean throttleEnabled; + /** Write throttle type. */ + private ThrottlingPolicy throttlingPlc; /** */ private boolean pageEvictWarned; @@ -246,7 +246,7 @@ public class PageMemoryImpl implements PageMemoryEx { * @param pageSize Page size. * @param flushDirtyPage Callback invoked when a dirty page is evicted. * @param changeTracker Callback invoked to track changes in pages. - * @param throttleEnabled Write throttle enabled flag. + * @param throttlingPlc Write throttle enabled and type. */ public PageMemoryImpl( DirectMemoryProvider directMemoryProvider, @@ -257,7 +257,7 @@ public class PageMemoryImpl implements PageMemoryEx { @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker, CheckpointLockStateChecker stateChecker, DataRegionMetricsImpl memMetrics, - boolean throttleEnabled + ThrottlingPolicy throttlingPlc ) { assert ctx != null; @@ -269,7 +269,7 @@ public class PageMemoryImpl implements PageMemoryEx { this.flushDirtyPage = flushDirtyPage; this.changeTracker = changeTracker; this.stateChecker = stateChecker; - this.throttleEnabled = throttleEnabled; + this.throttlingPlc = throttlingPlc; storeMgr = ctx.pageStore(); walMgr = ctx.wal(); @@ -320,7 +320,7 @@ public class PageMemoryImpl implements PageMemoryEx { totalAllocated += reg.size(); - segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttleEnabled); + segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttlingPlc); pages += segments[i].pages(); totalTblSize += segments[i].tableSize(); @@ -344,11 +344,17 @@ public class PageMemoryImpl implements PageMemoryEx { log.error("Write throttle can't start. Unexpected class of database manager: " + ctx.database().getClass()); - throttleEnabled = false; + throttlingPlc = ThrottlingPolicy.NONE; } - if (throttleEnabled) - writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)ctx.database()); + if (isThrottlingEnabled()) { + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.database(); + + if (throttlingPlc == ThrottlingPolicy.SPEED_BASED) + writeThrottle = new PagesWriteSpeedBasedThrottle(this, db, log); + else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) + writeThrottle = new PagesWriteThrottle(this, db); + } } /** {@inheritDoc} */ @@ -431,6 +437,9 @@ public class PageMemoryImpl implements PageMemoryEx { assert ctx.database().checkpointLockIsHeldByThread(); + if (isThrottlingEnabled()) + writeThrottle.onMarkDirty(false); + long pageId = storeMgr.allocatePage(cacheId, partId, flags); assert PageIdUtils.pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one) @@ -876,6 +885,32 @@ public class PageMemoryImpl implements PageMemoryEx { return false; } + /** + * @return Max dirty ratio from the segments. + */ + double getDirtyPagesRatio() { + double res = 0; + + for (Segment segment : segments) { + res = Math.max(res, segment.getDirtyPagesRatio()); + } + + return res; + } + + /** + * @return Total pages can be placed in all segments. + */ + public long totalPages() { + long res = 0; + + for (Segment segment : segments) { + res += segment.pages(); + } + + return res; + } + /** {@inheritDoc} */ @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException { if (segments == null) @@ -896,9 +931,19 @@ public class PageMemoryImpl implements PageMemoryEx { memMetrics.resetDirtyPages(); + if (isThrottlingEnabled()) + writeThrottle.onBeginCheckpoint(); + return new GridMultiCollectionWrapper<>(collections); } + /** + * @return {@code True} if throttling is enabled. + */ + private boolean isThrottlingEnabled() { + return throttlingPlc != ThrottlingPolicy.NONE; + } + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "TooBroadScope"}) @Override public void finishCheckpoint() { @@ -908,7 +953,7 @@ public class PageMemoryImpl implements PageMemoryEx { for (Segment seg : segments) seg.segCheckpointPages = null; - if (throttleEnabled) + if (isThrottlingEnabled()) writeThrottle.onFinishCheckpoint(); } @@ -1327,7 +1372,11 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param page Page pointer. + * @param fullId full page ID. + * @param walPlc * @param walPlc Full page WAL record policy. + * @param markDirty set dirty flag to page. + * @param restore */ private void writeUnlockPage( long page, @@ -1336,13 +1385,13 @@ public class PageMemoryImpl implements PageMemoryEx { boolean markDirty, boolean restore ) { - boolean dirty = isDirty(page); + boolean wasDirty = isDirty(page); //if page is for restore, we shouldn't mark it as changed - if (!restore && markDirty && !dirty && changeTracker != null) + if (!restore && markDirty && !wasDirty && changeTracker != null) changeTracker.apply(page, fullId, this); - boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !dirty); + boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !wasDirty); assert GridUnsafe.getInt(page + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 @@ -1360,7 +1409,7 @@ public class PageMemoryImpl implements PageMemoryEx { try { rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); - if (throttleEnabled && !restore && markDirty) + if (isThrottlingEnabled() && !restore && markDirty && !wasDirty) writeThrottle.onMarkDirty(isInCheckpoint(fullId)); } catch (AssertionError ex) { @@ -1449,12 +1498,13 @@ public class PageMemoryImpl implements PageMemoryEx { /** * This method must be called in synchronized context. * + * @param pageId full page ID. * @param absPtr Absolute pointer. * @param dirty {@code True} dirty flag. * @param forceAdd If this flag is {@code true}, then the page will be added to the dirty set regardless whether the * old flag was dirty or not. */ - void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean forceAdd) { + private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean forceAdd) { boolean wasDirty = PageHeader.dirty(absPtr, dirty); if (dirty) { @@ -1733,9 +1783,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param region Memory region. - * @param throttlingEnabled Write throttling enabled flag. + * @param throttlingPlc policy determine if write throttling enabled and its type. */ - private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, boolean throttlingEnabled) { + private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, ThrottlingPolicy throttlingPlc) { long totalMemory = region.size(); int pages = (int)(totalMemory / sysPageSize); @@ -1752,7 +1802,9 @@ public class PageMemoryImpl implements PageMemoryEx { pool = new PagePool(idx, poolRegion, null); - maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages); + maxDirtyPages = throttlingPlc != ThrottlingPolicy.NONE + ? pool.pages() * 3 / 4 + : Math.min(pool.pages() * 2 / 3, cpPoolPages); } /** @@ -1780,7 +1832,14 @@ public class PageMemoryImpl implements PageMemoryEx { * @param dirtyRatioThreshold Throttle threshold. */ private boolean shouldThrottle(double dirtyRatioThreshold) { - return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold; + return getDirtyPagesRatio() > dirtyRatioThreshold; + } + + /** + * @return dirtyRatio to be compared with Throttle threshold. + */ + private double getDirtyPagesRatio() { + return ((double)dirtyPages.size()) / pages(); } /** @@ -2605,4 +2664,13 @@ public class PageMemoryImpl implements PageMemoryEx { } } } + + /** + * Throttling enabled and its type enum. + */ + public enum ThrottlingPolicy { + /** Not throttled. */NONE, + /** Target ratio based: CP progress is used as border. */ TARGET_RATIO_BASED, + /** Speed based. CP writting speed and estimated ideal speed are used as border */ SPEED_BASED + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java new file mode 100644 index 0000000..cb19eca --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java @@ -0,0 +1,519 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.GridConcurrentHashSet; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + * Uses average checkpoint write speed and moment speed of marking pages as dirty. + */ +public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { + /** Maximum dirty pages in region. */ + private static final double MAX_DIRTY_PAGES = 0.75; + + /** Page memory. */ + private final PageMemoryImpl pageMemory; + + /** Database manager. */ + private final GridCacheDatabaseSharedManager dbSharedMgr; + + /** Starting throttle time. Limits write speed to 1000 MB/s. */ + private static final long STARTING_THROTTLE_NANOS = 4000; + + /** Backoff ratio. Each next park will be this times longer. */ + private static final double BACKOFF_RATIO = 1.05; + + /** Percent of dirty pages which will not cause throttling. */ + private static final double MIN_RATIO_NO_THROTTLE = 0.03; + + /** Exponential backoff counter. */ + private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); + + /** Counter of written pages from checkpoint. Value is saved here for detecting checkpoint start. */ + private final AtomicInteger lastObservedWritten = new AtomicInteger(0); + + /** + * Dirty pages ratio was observed at checkpoint start (here start is moment when first page was actually saved to + * store). This ratio is excluded from throttling. + */ + private volatile double initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE; + + /** + * Target (maximum) dirty pages ratio, after which throttling will start using + * {@link #getParkTime(double, long, int, int, long, long)}. + */ + private volatile double targetDirtyRatio; + + /** + * Current dirty pages ratio (percent of dirty pages in most used segment), negative value means no cp is running. + */ + private volatile double currDirtyRatio; + + /** Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second. */ + private final IntervalBasedMeasurement speedCpWrite = new IntervalBasedMeasurement(); + + /** Last estimated speed for marking all clear pages as dirty till the end of checkpoint. */ + private volatile long speedForMarkAll; + + /** Threads set. Contains identifiers of all threads which were marking pages for current checkpoint. */ + private final GridConcurrentHashSet<Long> threadIds = new GridConcurrentHashSet<>(); + + /** + * Used for calculating speed of marking pages dirty. + * Value from past 750-1000 millis only. + * {@link IntervalBasedMeasurement#getSpeedOpsPerSec(long)} returns pages marked/second. + * {@link IntervalBasedMeasurement#getAverage()} returns average throttle time. + * */ + private final IntervalBasedMeasurement speedMarkAndAvgParkTime = new IntervalBasedMeasurement(250, 3); + + /** Total pages which is possible to store in page memory. */ + private long totalPages; + + /** Logger. */ + private IgniteLogger log; + + /** Previous warning time, nanos. */ + private AtomicLong prevWarnTime = new AtomicLong(); + + /** Warning min delay nanoseconds. */ + private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10); + + /** Warning threshold: minimal level of pressure that causes warning messages to log. */ + static final double WARN_THRESHOLD = 0.2; + + /** + * @param pageMemory Page memory. + * @param dbSharedMgr Database manager. + * @param log Logger. + */ + public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory, + GridCacheDatabaseSharedManager dbSharedMgr, IgniteLogger log) { + this.pageMemory = pageMemory; + this.dbSharedMgr = dbSharedMgr; + totalPages = pageMemory.totalPages(); + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert dbSharedMgr.checkpointLockIsHeldByThread(); + + AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + + if (writtenPagesCntr == null) { + speedForMarkAll = 0; + targetDirtyRatio = -1; + currDirtyRatio = -1; + + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = writtenPagesCntr.get(); + + long fullyCompletedPages = (cpWrittenPages + cpSyncedPages()) / 2; // written & sync'ed + + long curNanoTime = System.nanoTime(); + + speedCpWrite.setCounter(fullyCompletedPages, curNanoTime); + + long markDirtySpeed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(curNanoTime); + + long curCpWriteSpeed = speedCpWrite.getSpeedOpsPerSec(curNanoTime); + + threadIds.add(Thread.currentThread().getId()); + + ThrottleMode level = ThrottleMode.NO; //should apply delay (throttling) for current page modification + + if (isPageInCheckpoint) { + int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3; + + if (pageMemory.checkpointBufferPagesCount() > checkpointBufLimit) + level = ThrottleMode.EXPONENTIAL; + } + + long throttleParkTimeNs = 0; + + if (level == ThrottleMode.NO) { + int nThreads = threadIds.size(); + + int cpTotalPages = cpTotalPages(); + + if (cpTotalPages == 0) { + boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed; + + if (throttleByCpSpeed) { + throttleParkTimeNs = calcDelayTime(curCpWriteSpeed, nThreads, 1); + + level = ThrottleMode.LIMITED; + } + } + else { + double dirtyPagesRatio = pageMemory.getDirtyPagesRatio(); + + currDirtyRatio = dirtyPagesRatio; + + detectCpPagesWriteStart(cpWrittenPages, dirtyPagesRatio); + + if (dirtyPagesRatio >= MAX_DIRTY_PAGES) + level = ThrottleMode.NO; // too late to throttle, will wait on safe to update instead. + else { + int notEvictedPagesTotal = cpTotalPages - cpEvictedPages(); + + throttleParkTimeNs = getParkTime(dirtyPagesRatio, + fullyCompletedPages, + notEvictedPagesTotal < 0 ? 0 : notEvictedPagesTotal, + nThreads, + markDirtySpeed, + curCpWriteSpeed); + + level = ThrottleMode.LIMITED; + } + } + } + + if (level == ThrottleMode.NO) { + exponentialBackoffCntr.set(0); + + throttleParkTimeNs = 0; + } + else if (level == ThrottleMode.EXPONENTIAL) { + int exponent = exponentialBackoffCntr.getAndIncrement(); + + throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, exponent)); + } + + if (throttleParkTimeNs > 0) { + recurrentLogIfNeed(); + + doPark(throttleParkTimeNs); + } + + speedMarkAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs); + } + + /** + * Disables the current thread for thread scheduling purposes. May be overriden by subclasses for tests + * + * @param throttleParkTimeNs the maximum number of nanoseconds to wait + */ + protected void doPark(long throttleParkTimeNs) { + LockSupport.parkNanos(throttleParkTimeNs); + } + + /** + * @return number of written pages. + */ + private int cpWrittenPages() { + AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + + return writtenPagesCntr == null ? 0 : writtenPagesCntr.get(); + } + + /** + * @return Number of pages in current checkpoint. + */ + private int cpTotalPages() { + return dbSharedMgr.currentCheckpointPagesCount(); + } + + /** + * @return Counter for fsynced checkpoint pages. + */ + private int cpSyncedPages() { + AtomicInteger syncedPagesCntr = dbSharedMgr.syncedPagesCounter(); + + return syncedPagesCntr == null ? 0 : syncedPagesCntr.get(); + } + + /** + * @return number of evicted pages. + */ + private int cpEvictedPages() { + AtomicInteger evictedPagesCntr = dbSharedMgr.evictedPagesCntr(); + + return evictedPagesCntr == null ? 0 : evictedPagesCntr.get(); + } + + /** + * Prints warning to log if throttling is occurred and requires markable amount of time. + */ + private void recurrentLogIfNeed() { + long prevWarningNs = prevWarnTime.get(); + long curNs = System.nanoTime(); + + if (prevWarningNs != 0 && (curNs - prevWarningNs) <= WARN_MIN_DELAY_NS) + return; + + double weight = throttleWeight(); + if (weight <= WARN_THRESHOLD) + return; + + if (prevWarnTime.compareAndSet(prevWarningNs, curNs)) { + String msg = String.format("Throttling is applied to page modifications " + + "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " + + "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " + + "pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]", + weight, getMarkDirtySpeed(), getCpWriteSpeed(), + getLastEstimatedSpeedForMarkAll(), getCurrDirtyRatio(), getTargetDirtyRatio(), throttleParkTime(), + cpTotalPages(), cpEvictedPages(), cpWrittenPages(), cpSyncedPages(), + pageMemory.checkpointBufferPagesCount(), pageMemory.checkpointBufferPagesSize()); + + log.info(msg); + } + } + + /** + * @param dirtyPagesRatio actual percent of dirty pages. + * @param fullyCompletedPages written & fsynced pages count. + * @param cpTotalPages total checkpoint scope. + * @param nThreads number of threads providing data during current checkpoint. + * @param markDirtySpeed registered mark dirty speed, pages/sec. + * @param curCpWriteSpeed average checkpoint write speed, pages/sec. + * @return time in nanoseconds to part or 0 if throttling is not required. + */ + long getParkTime( + double dirtyPagesRatio, + long fullyCompletedPages, + int cpTotalPages, + int nThreads, + long markDirtySpeed, + long curCpWriteSpeed) { + + long speedForMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio, + fullyCompletedPages, + curCpWriteSpeed, + cpTotalPages); + + double targetDirtyRatio = calcTargetDirtyRatio(fullyCompletedPages, cpTotalPages); + + this.speedForMarkAll = speedForMarkAll; //publish for metrics + this.targetDirtyRatio = targetDirtyRatio; //publish for metrics + + boolean lowSpaceLeft = dirtyPagesRatio > targetDirtyRatio && (dirtyPagesRatio + 0.05 > MAX_DIRTY_PAGES); + int slowdown = lowSpaceLeft ? 3 : 1; + + double multiplierForSpeedForMarkAll = lowSpaceLeft + ? 0.8 + : 1.0; + + boolean markingTooFast = speedForMarkAll > 0 && markDirtySpeed > multiplierForSpeedForMarkAll * speedForMarkAll; + boolean throttleBySizeAndMarkSpeed = dirtyPagesRatio > targetDirtyRatio && markingTooFast; + + //for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average + double allowWriteFasterThanCp = (speedForMarkAll > 0 && markDirtySpeed > 0 && speedForMarkAll > markDirtySpeed) + ? (0.1 * speedForMarkAll / markDirtySpeed) + : (dirtyPagesRatio > targetDirtyRatio ? 0.0 : 0.1); + + double fasterThanCpWriteSpeed = lowSpaceLeft + ? 1.0 + : 1.0 + allowWriteFasterThanCp; + boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > (fasterThanCpWriteSpeed * curCpWriteSpeed); + + long delayByCpWrite = throttleByCpSpeed ? calcDelayTime(curCpWriteSpeed, nThreads, slowdown) : 0; + long delayByMarkAllWrite = throttleBySizeAndMarkSpeed ? calcDelayTime(speedForMarkAll, nThreads, slowdown) : 0; + return Math.max(delayByCpWrite, delayByMarkAllWrite); + } + + /** + * @param dirtyPagesRatio current percent of dirty pages. + * @param fullyCompletedPages count of written and sync'ed pages + * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'. + * @param cpTotalPages total pages in checkpoint. + * @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no + * data'. + */ + private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio, + long fullyCompletedPages, + long curCpWriteSpeed, + int cpTotalPages) { + + if (curCpWriteSpeed == 0) + return 0; + + if (cpTotalPages <= 0) + return 0; + + if (dirtyPagesRatio >= MAX_DIRTY_PAGES) + return 0; + + double remainedClear = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages; + + double timeRemainedSeconds = 1.0 * (cpTotalPages - fullyCompletedPages) / curCpWriteSpeed; + + return (long)(remainedClear / timeRemainedSeconds); + } + + /** + * @param fullyCompletedPages number of completed. + * @param cpTotalPages Total amount of pages under checkpoint. + * @return size-based calculation of target ratio. + */ + private double calcTargetDirtyRatio(long fullyCompletedPages, int cpTotalPages) { + double cpProgress = ((double)fullyCompletedPages) / cpTotalPages; + + // Starting with initialDirtyRatioAtCpBegin to avoid throttle right after checkpoint start + double constStart = initDirtyRatioAtCpBegin; + + double throttleTotalWeight = 1.0 - constStart; + + // .75 is maximum ratio of dirty pages + return (cpProgress * throttleTotalWeight + constStart) * MAX_DIRTY_PAGES; + } + + /** + * @param baseSpeed speed to slow down. + * @param nThreads operating threads. + * @param coefficient how much it is needed to slowdown base speed. 1.0 means delay to get exact base speed. + * @return sleep time in nanoseconds. + */ + private long calcDelayTime(long baseSpeed, int nThreads, double coefficient) { + if (coefficient <= 0.0) + return 0; + + if (baseSpeed <= 0) + return 0; + + long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed); + + return (long)(coefficient * updTimeNsForOnePage); + } + + /** + * @param cpWrittenPages current counter of written pages. + * @param dirtyPagesRatio current percent of dirty pages. + */ + private void detectCpPagesWriteStart(int cpWrittenPages, double dirtyPagesRatio) { + if (cpWrittenPages > 0 && lastObservedWritten.compareAndSet(0, cpWrittenPages)) { + double newMinRatio = dirtyPagesRatio; + + if (newMinRatio < MIN_RATIO_NO_THROTTLE) + newMinRatio = MIN_RATIO_NO_THROTTLE; + + if (newMinRatio > 1) + newMinRatio = 1; + + //for slow cp is completed now, drop previous dirty page percent + initDirtyRatioAtCpBegin = newMinRatio; + } + } + + /** {@inheritDoc} */ + @Override public void onBeginCheckpoint() { + speedCpWrite.setCounter(0L, System.nanoTime()); + + initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE; + + lastObservedWritten.set(0); + } + + + /** {@inheritDoc} */ + @Override public void onFinishCheckpoint() { + exponentialBackoffCntr.set(0); + + speedCpWrite.finishInterval(); + speedMarkAndAvgParkTime.finishInterval(); + threadIds.clear(); + } + + /** + * @return Exponential backoff counter. + */ + public long throttleParkTime() { + return speedMarkAndAvgParkTime.getAverage(); + } + + /** + * @return Target (maximum) dirty pages ratio, after which throttling will start. + */ + public double getTargetDirtyRatio() { + return targetDirtyRatio; + } + + /** + * @return Current dirty pages ratio. + */ + public double getCurrDirtyRatio() { + double ratio = currDirtyRatio; + + if (ratio >= 0) + return ratio; + + return pageMemory.getDirtyPagesRatio(); + } + + /** + * @return Speed of marking pages dirty. Value from past 750-1000 millis only. Pages/second. + */ + public long getMarkDirtySpeed() { + return speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime()); + } + + /** + * @return Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second. + */ + public long getCpWriteSpeed() { + return speedCpWrite.getSpeedOpsPerSecReadOnly(); + } + + /** + * @return Returns {@link #speedForMarkAll}. + */ + public long getLastEstimatedSpeedForMarkAll() { + return speedForMarkAll; + } + + /** + * Measurement shows how much throttling time is involved into average marking time. + * @return metric started from 0.0 and showing how much throttling is involved into current marking process. + */ + public double throttleWeight() { + long speed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime()); + + if (speed <= 0) + return 0; + + long timeForOnePage = calcDelayTime(speed, threadIds.size(), 1); + + if (timeForOnePage == 0) + return 0; + + return 1.0 * throttleParkTime() / timeForOnePage; + } + + /** + * Throttling mode for page. + */ + private enum ThrottleMode { + /** No delay is applied. */ + NO, + + /** Limited, time is based on target speed. */ + LIMITED, + + /** Exponential. */ + EXPONENTIAL + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java index a890442..9206935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase * Throttles threads that generate dirty pages during ongoing checkpoint. * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. */ -public class PagesWriteThrottle { +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { /** Page memory. */ private final PageMemoryImpl pageMemory; @@ -48,11 +48,8 @@ public class PagesWriteThrottle { this.dbSharedMgr = dbSharedMgr; } - /** - * Callback to apply throttling delay. - * @param isInCheckpoint flag indicating if checkpoint is running. - */ - public void onMarkDirty(boolean isInCheckpoint) { + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { assert dbSharedMgr.checkpointLockIsHeldByThread(); AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); @@ -62,7 +59,7 @@ public class PagesWriteThrottle { boolean shouldThrottle = false; - if (isInCheckpoint) { + if (isPageInCheckpoint) { int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3; shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; @@ -96,10 +93,12 @@ public class PagesWriteThrottle { exponentialBackoffCntr.set(0); } - /** - * - */ - public void onFinishCheckpoint() { + /** {@inheritDoc} */ + @Override public void onBeginCheckpoint() { + } + + /** {@inheritDoc} */ + @Override public void onFinishCheckpoint() { exponentialBackoffCntr.set(0); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java new file mode 100644 index 0000000..adeaa3d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +/** + * Throttling policy, encapsulates logic of delaying write operations. + */ +public interface PagesWriteThrottlePolicy { + /** + * Callback to apply throttling delay. + * @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint. + */ + void onMarkDirty(boolean isPageInCheckpoint); + + /** + * Callback to notify throttling policy checkpoint was started. + */ + void onBeginCheckpoint(); + + /** + * Callback to notify throttling policy checkpoint was finished. + */ + void onFinishCheckpoint(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java index 782949f..c17f6cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java @@ -120,7 +120,7 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract boolean checkpointWithLowNumOfPagesFound = false; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 20; i++) { Random random = new Random(); //touch some entry int d = random.nextInt(PARTS) + PARTS; @@ -134,18 +134,29 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract if (log.isInfoEnabled()) log.info("Put to cache [" + fullname + "] value " + d); - final int timeout = 5000; + long start = System.nanoTime(); try { - db.wakeupForCheckpoint("").get(timeout, TimeUnit.MILLISECONDS); + final int cpTimeout = 25000; + + db.wakeupForCheckpoint("").get(cpTimeout, TimeUnit.MILLISECONDS); } - catch (IgniteFutureTimeoutCheckedException e) { + catch (IgniteFutureTimeoutCheckedException ignored) { + long msPassed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + + log.error("Timeout during waiting for checkpoint to start:" + + " [" + msPassed + "] but checkpoint is not running"); + continue; } + final int timeout = 5000; int currCpPages = waitForCurrentCheckpointPagesCounterUpdated(db, timeout); - if (currCpPages < 0) + if (currCpPages < 0) { + log.error("Timeout during waiting for checkpoint counter to be updated"); + continue; + } pageCntObserved.add(currCpPages); http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java new file mode 100644 index 0000000..c49f08e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import junit.framework.TestCase; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * Sandbox test to measure progress of grid write operations. If no progress occur during period of time, then thread + * dumps are generated. + */ +public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest { + /** Cache name. Random to cover external stores possible problems. */ + public static final String CACHE_NAME = "partitioned" + new Random().nextInt(10000000); + + /** Object size - minimal size of object to be placed in cache. */ + private static final int OBJECT_SIZE = 40000; + + /** Records count to continuous put into cache. */ + private static final int CONTINUOUS_PUT_RECS_CNT = 300_000; + + /** Put thread: client threads naming for put operation. */ + private static final String PUT_THREAD = "put-thread"; + + /** Get thread: client threadsd naming for verify operation. */ + private static final String GET_THREAD = "get-thread"; + + /** Option to enabled storage verification after test. */ + private static final boolean VERIFY_STORAGE = false; + + /** + * Set WAL archive and work folders to same value. Activates 'No Archiver' mode. + * See {@link FileWriteAheadLogManager#isArchiverEnabled()}. + */ + private boolean setWalArchAndWorkToSameVal; + + /** Option for test run: WAL segments size in bytes. */ + private int walSegmentSize = 64 * 1024 * 1024; + + /** Option for test run: Custom WAL mode. */ + private WALMode customWalMode; + + /** Option for test run: Checkpoint frequency. */ + private int checkpointFrequency = 40 * 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, HugeIndexedObject> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 1024)); + ccfg.setIndexedTypes(Integer.class, HugeIndexedObject.class); + ccfg.setName(CACHE_NAME); + + cfg.setCacheConfiguration(ccfg); + + DataRegionConfiguration regCfg = new DataRegionConfiguration() + .setName("dfltMemPlc") + .setMetricsEnabled(true) + .setMaxSize(2 * 1024L * 1024 * 1024) + .setPersistenceEnabled(true); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + + dsCfg.setDefaultDataRegionConfiguration(regCfg) + .setPageSize(4 * 1024) + .setWriteThrottlingEnabled(true) + .setCheckpointFrequency(checkpointFrequency); + + final String workDir = U.defaultWorkDirectory(); + final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); + final File wal = new File(db, "wal"); + if (setWalArchAndWorkToSameVal) { + final String walAbsPath = wal.getAbsolutePath(); + + dsCfg.setWalPath(walAbsPath); + + dsCfg.setWalArchivePath(walAbsPath); + } + else { + dsCfg.setWalPath(wal.getAbsolutePath()); + + dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath()); + } + + dsCfg.setWalMode(customWalMode != null ? customWalMode : WALMode.LOG_ONLY) + .setWalHistorySize(1) + .setWalSegments(10); + + if (walSegmentSize != 0) + dsCfg.setWalSegmentSize(walSegmentSize); + + cfg.setDataStorageConfiguration(dsCfg); + + cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "temp", false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Runs multithreaded put scenario (no data streamer). Load is generated to page store and to WAL. + * @throws Exception if failed. + */ + public void testContinuousPutMultithreaded() throws Exception { + try { + // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true"); + // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, "true"); + System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, "false"); + System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, "speed"); + + setWalArchAndWorkToSameVal = true; + + customWalMode = WALMode.BACKGROUND; + + final IgniteEx ignite = startGrid(1); + + ignite.active(true); + + final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME); + final int threads = Runtime.getRuntime().availableProcessors(); + + final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads; + + final Collection<Callable<?>> tasks = new ArrayList<>(); + + final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD); + + for (int j = 0; j < threads; j++) { + final int finalJ = j; + + tasks.add(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++) { + HugeIndexedObject v = new HugeIndexedObject(i); + cache.put(i, v); + watchdog.reportProgress(1); + } + return null; + } + }); + } + + watchdog.start(); + GridTestUtils.runMultiThreaded(tasks, PUT_THREAD); + + watchdog.stopping(); + stopGrid(1); + + watchdog.stop(); + + if (VERIFY_STORAGE) + runVerification(threads, recsPerThread); + } + finally { + stopAllGrids(); + } + } + + /** + * Runs multithreaded put scenario (no data streamer). Load is generated to page store and to WAL. + * @throws Exception if failed. + */ + public void testDataStreamerContinuousPutMultithreaded() throws Exception { + try { + // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true"); + // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, "true"); + System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, "false"); + System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, "speed"); + + setWalArchAndWorkToSameVal = true; + + customWalMode = WALMode.BACKGROUND; + + final IgniteEx ignite = startGrid(1); + + ignite.active(true); + + final int threads = 1; Runtime.getRuntime().availableProcessors(); + + final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads; + + final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD); + + IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(CACHE_NAME); + + streamer.perNodeBufferSize(12); + + final Collection<Callable<?>> tasks = new ArrayList<>(); + for (int j = 0; j < threads; j++) { + final int finalJ = j; + + tasks.add((Callable<Void>)() -> { + for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++) + streamer.addData(i, new HugeIndexedObject(i)); + + return null; + }); + } + + final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME); + ScheduledExecutorService svcReport = Executors.newScheduledThreadPool(1); + + AtomicInteger size = new AtomicInteger(); + svcReport.scheduleAtFixedRate( + () -> { + int newSize = cache.size(); + int oldSize = size.getAndSet(newSize); + + watchdog.reportProgress(newSize - oldSize); + }, + 250, 250, TimeUnit.MILLISECONDS); + + watchdog.start(); + GridTestUtils.runMultiThreaded(tasks, PUT_THREAD); + streamer.close(); + + watchdog.stopping(); + stopGrid(1); + + watchdog.stop(); + + ProgressWatchdog.stopPool(svcReport); + + if (VERIFY_STORAGE) + runVerification(threads, recsPerThread); + } + finally { + stopAllGrids(); + } + } + + + /** + * Verifies data from storage. + * + * @param threads threads count. + * @param recsPerThread record per thread loaded. + * @throws Exception if failed + */ + private void runVerification(int threads, final int recsPerThread) throws Exception { + final Ignite restartedIgnite = startGrid(1); + + restartedIgnite.active(true); + + final IgniteCache<Integer, HugeIndexedObject> restartedCache = restartedIgnite.cache(CACHE_NAME); + + final ProgressWatchdog watchdog2 = new ProgressWatchdog(restartedIgnite, "get", GET_THREAD); + + final Collection<Callable<?>> tasksR = new ArrayList<>(); + tasksR.clear(); + for (int j = 0; j < threads; j++) { + final int finalJ = j; + tasksR.add(new Callable<Void>() { + @Override public Void call() { + for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++) { + HugeIndexedObject obj = restartedCache.get(i); + int actVal = obj.iVal; + TestCase.assertEquals(i, actVal); + watchdog2.reportProgress(1); + } + return null; + } + }); + } + + watchdog2.start(); + GridTestUtils.runMultiThreaded(tasksR, GET_THREAD); + watchdog2.stop(); + } + + /** + * @param threads Threads count. + * @param recsPerThread initial records per thread. + * @param restartedCache cache to obtain data from. + */ + private void verifyByChunk(int threads, int recsPerThread, Cache<Integer, HugeIndexedObject> restartedCache) { + int verifyChunk = 100; + + int totalRecsToVerify = recsPerThread * threads; + int chunks = totalRecsToVerify / verifyChunk; + + for (int c = 0; c < chunks; c++) { + Set<Integer> keys = new TreeSet<>(); + + for (int i = 0; i < verifyChunk; i++) + keys.add(i + c * verifyChunk); + + Map<Integer, HugeIndexedObject> values = restartedCache.getAll(keys); + + for (Map.Entry<Integer, HugeIndexedObject> next : values.entrySet()) { + Integer key = next.getKey(); + + int actVal = values.get(next.getKey()).iVal; + int i = key; + TestCase.assertEquals(i, actVal); + + if (i % 1000 == 0) + X.println(" >> Verified: " + i); + } + + } + } + + /** + * @param id entry id. + * @return {@code True} if need to keep entry in DB and checkpoint it. Most of entries not required. + */ + private static boolean keepInDb(int id) { + return id % 1777 == 0; + } + + /** + * Runs multithreaded put-remove scenario (no data streamer). Load is generated to WAL log mostly. + * Most of entries generated will be removed before first checkpoint. + * + * @throws Exception if failed. + */ + public void testPutRemoveMultithreaded() throws Exception { + setWalArchAndWorkToSameVal = false; + customWalMode = WALMode.LOG_ONLY; + + try { + final IgniteEx ignite = startGrid(1); + + ignite.active(true); + checkpointFrequency = 20 * 1000; + final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME); + int totalRecs = 400_000; + final int threads = 10; + final int recsPerThread = totalRecs / threads; + final Collection<Callable<?>> tasks = new ArrayList<>(); + final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD); + + for (int j = 0; j < threads; j++) { + final int finalJ = j; + + tasks.add(new Callable<Void>() { + @Override public Void call() throws Exception { + final Collection<Integer> toRmvLaterList = new ArrayList<>(); + + for (int id = finalJ * recsPerThread; id < ((finalJ + 1) * recsPerThread); id++) { + HugeIndexedObject v = new HugeIndexedObject(id); + + cache.put(id, v); + toRmvLaterList.add(id); + watchdog.reportProgress(1); + + if (toRmvLaterList.size() > 100) { + for (Integer toRemoveId : toRmvLaterList) { + if (keepInDb(toRemoveId)) + continue; + + boolean rmv = cache.remove(toRemoveId); + assert rmv : "Expected to remove object from cache " + toRemoveId; + } + toRmvLaterList.clear(); + } + } + return null; + } + }); + } + + watchdog.start(); + GridTestUtils.runMultiThreaded(tasks, PUT_THREAD); + watchdog.stop(); + stopGrid(1); + + final Ignite restartedIgnite = startGrid(1); + + restartedIgnite.active(true); + + final IgniteCache<Object, HugeIndexedObject> restartedCache = restartedIgnite.cache(CACHE_NAME); + + for (int i = 0; i < recsPerThread * threads; i++) { + if (keepInDb(i)) { + final HugeIndexedObject obj = restartedCache.get(i); + + TestCase.assertNotNull(obj); + TestCase.assertEquals(i, obj.iVal); + } + + if (i % 1000 == 0) + X.print(" V: " + i); + } + } + finally { + stopAllGrids(); + } + } + + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.MINUTES.toMillis(20); + } + + /** Object with additional 40 000 bytes of payload */ + public static class HugeIndexedObject { + /** Data. */ + private byte[] data; + /** */ + @QuerySqlField(index = true) + private int iVal; + + /** + * @param iVal Integer value. + */ + private HugeIndexedObject(int iVal) { + this.iVal = iVal; + + int sz = OBJECT_SIZE; + + data = new byte[sz]; + for (int i = 0; i < sz; i++) + data[i] = (byte)('A' + (i % 10)); + } + + /** + * @return Data. + */ + public byte[] data() { + return data; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HugeIndexedObject)) + return false; + + HugeIndexedObject that = (HugeIndexedObject)o; + + return iVal == that.iVal; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return iVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HugeIndexedObject.class, this); + } + } +} \ No newline at end of file
