IGNITE-9909 Merge FsyncWalManager and WalManager - Fixes #5013. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/889ce79b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/889ce79b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/889ce79b Branch: refs/heads/master Commit: 889ce79bba0187891d334fedbc63235ff85fabe6 Parents: cf81e5a Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Fri Nov 9 15:05:18 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Fri Nov 9 15:05:18 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 17 +- .../wal/FileWriteAheadLogManager.java | 1017 +---- .../wal/FsyncModeFileWriteAheadLogManager.java | 3482 ------------------ .../wal/filehandle/AbstractFileHandle.java | 47 + .../wal/filehandle/FileHandleManager.java | 81 + .../filehandle/FileHandleManagerFactory.java | 90 + .../wal/filehandle/FileHandleManagerImpl.java | 603 +++ .../wal/filehandle/FileWriteHandle.java | 113 + .../wal/filehandle/FileWriteHandleImpl.java | 601 +++ .../filehandle/FsyncFileHandleManagerImpl.java | 157 + .../wal/filehandle/FsyncFileWriteHandle.java | 845 +++++ .../wal/serializer/RecordSerializerFactory.java | 2 + .../resources/META-INF/classnames.properties | 5 - ...lFlushMultiNodeFailoverAbstractSelfTest.java | 9 +- .../wal/IgniteWalIteratorSwitchSegmentTest.java | 65 +- .../wal/memtracker/PageMemoryTracker.java | 45 +- .../yardstick/IgniteAbstractBenchmark.java | 7 +- 17 files changed, 2615 insertions(+), 4571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 317378b..940e4a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -62,7 +62,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; -import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -111,7 +110,6 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCa import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; @@ -214,9 +212,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final boolean startClientCaches = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false); - private final boolean walFsyncWithDedicatedWorker = - IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false); - /** Enables start caches in parallel. */ private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true); @@ -3041,13 +3036,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { walMgr = ctx.plugins().createComponent(IgniteWriteAheadLogManager.class); - if (walMgr == null) { - if (ctx.config().getDataStorageConfiguration().getWalMode() == WALMode.FSYNC && - !walFsyncWithDedicatedWorker) - walMgr = new FsyncModeFileWriteAheadLogManager(ctx); - else - walMgr = new FileWriteAheadLogManager(ctx); - } + if (walMgr == null) + walMgr = new FileWriteAheadLogManager(ctx); } else { if (CU.isPersistenceEnabled(ctx.config()) && ctx.clientNode()) { @@ -3531,7 +3521,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ - @SuppressWarnings("IfMayBeConditional") public IgniteInternalFuture<Boolean> dynamicStartCache( @Nullable CacheConfiguration ccfg, String cacheName, @@ -3555,7 +3544,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param ccfg Cache configuration. */ - @SuppressWarnings("IfMayBeConditional") public IgniteInternalFuture<Boolean> dynamicStartSqlCache( CacheConfiguration ccfg ) { @@ -3584,7 +3572,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ - @SuppressWarnings("IfMayBeConditional") public IgniteInternalFuture<Boolean> dynamicStartCache( @Nullable CacheConfiguration ccfg, String cacheName, http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index fbc4f6c..0bd20bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -26,12 +26,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.MappedByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; @@ -46,17 +42,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -79,7 +67,6 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; -import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; @@ -101,6 +88,10 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.AbstractFileHandle; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle; import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; @@ -114,7 +105,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; -import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CI1; @@ -139,48 +129,20 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; -import static org.apache.ignite.configuration.WALMode.LOG_ONLY; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD; -import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; -import static org.apache.ignite.internal.util.IgniteUtils.findField; -import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod; -import static org.apache.ignite.internal.util.IgniteUtils.sleep; /** * File WAL manager. */ @SuppressWarnings("IfMayBeConditional") public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager { - /** Dfault wal segment sync timeout. */ - public static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L; - /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */ - private static final Method force0 = findNonPublicMethod( - MappedByteBuffer.class, "force0", - java.io.FileDescriptor.class, long.class, long.class - ); - - /** {@link MappedByteBuffer#mappingOffset()}. */ - private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset"); - - /** {@link MappedByteBuffer#mappingAddress(long)}. */ - private static final Method mappingAddress = findNonPublicMethod( - MappedByteBuffer.class, "mappingAddress", long.class - ); - - /** {@link MappedByteBuffer#fd} */ - private static final Field fd = findField(MappedByteBuffer.class, "fd"); - - /** Page size. */ - private static final int PAGE_SIZE = GridUnsafe.pageSize(); - /** */ private static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0]; @@ -235,19 +197,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } }; - /** Latest serializer version to use. */ - private static final int LATEST_SERIALIZER_VERSION = 2; - /** Buffer size. */ private static final int BUF_SIZE = 1024 * 1024; /** Use mapped byte buffer. */ private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true); - /** {@link FileWriteHandle#written} atomic field updater. */ - private static final AtomicLongFieldUpdater<FileWriteHandle> WRITTEN_UPD = - AtomicLongFieldUpdater.newUpdater(FileWriteHandle.class, "written"); - /** * Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint. * Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue @@ -288,9 +243,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */ private final long flushFreq; - /** Fsync delay. */ - private final long fsyncDelay; - /** */ private final DataStorageConfiguration dsCfg; @@ -347,9 +299,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); - /** Current log segment handle */ + /** Current log segment handle. */ private volatile FileWriteHandle currHnd; + /** File handle manager. */ + private FileHandleManager fileHandleManager; + /** */ private volatile WALDisableContext walDisableContext; @@ -378,17 +333,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj; - /** WAL writer worker. */ - private WALWriter walWriter; - /** * Listener invoked for each segment file IO initializer. */ @Nullable private volatile IgniteInClosure<FileIO> createWalFileListener; - /** Wal segment sync worker. */ - private WalSegmentSyncer walSegmentSyncWorker; - /** * Manage of segment location. */ @@ -397,6 +346,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Segment factory with ability locked segment during reading. */ private SegmentFileInputFactory lockedSegmentFileInputFactory; + private FileHandleManagerFactory fileHandleManagerFactory; + /** * @param ctx Kernal context. */ @@ -412,9 +363,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl maxWalSegmentSize = dsCfg.getWalSegmentSize(); mode = dsCfg.getWalMode(); flushFreq = dsCfg.getWalFlushFrequency(); - fsyncDelay = dsCfg.getWalFsyncDelayNanos(); alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); - ioFactory = new RandomAccessFileIOFactory(); + ioFactory = mode == WALMode.FSYNC ? dsCfg.getFileIOFactory() : new RandomAccessFileIOFactory(); segmentFileInputFactory = new SimpleSegmentFileInputFactory(); walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); @@ -422,6 +372,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl evt = ctx.event(); failureProcessor = ctx.failure(); + + fileHandleManagerFactory = new FileHandleManagerFactory(dsCfg); } /** @@ -512,16 +464,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl walDisableContext = cctx.walState().walDisableContext(); - if (mode != WALMode.NONE && mode != WALMode.FSYNC) { - walSegmentSyncWorker = new WalSegmentSyncer(igCfg.getIgniteInstanceName(), - cctx.kernalContext().log(WalSegmentSyncer.class)); + fileHandleManager = fileHandleManagerFactory.build( + cctx, metrics, mmap, lastWALPtr::get, serializer, this::currentHandle + ); - if (log.isInfoEnabled()) - log.info("Started write-ahead log manager [mode=" + mode + ']'); - } - else - U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " + - "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown."); + fileHandleManager.start(); lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory( segmentAware, @@ -541,8 +488,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl new IgniteThread(archiver).start(); } - if (walSegmentSyncWorker != null) - new IgniteThread(walSegmentSyncWorker).start(); + fileHandleManager.onActivate(); if (compressor != null) compressor.start(); @@ -625,22 +571,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (timeoutObj != null) cctx.time().removeTimeoutObject(timeoutObj); - final FileWriteHandle currHnd = currentHandle(); - try { - if (mode == WALMode.BACKGROUND) { - if (currHnd != null) - currHnd.flush(null); - } - - if (currHnd != null) - currHnd.close(false); - - if (walSegmentSyncWorker != null) - walSegmentSyncWorker.shutdown(); - - if (walWriter != null) - walWriter.shutdown(); + fileHandleManager.onDeactivate(); segmentAware.interrupt(); @@ -660,7 +592,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } catch (Exception e) { - U.error(log, "Failed to gracefully close WAL segment: " + this.currHnd.fileIO, e); + U.error(log, "Failed to gracefully close WAL segment: " + this.currHnd, e); } } @@ -706,10 +638,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWALPointer filePtr = (FileWALPointer)lastPtr; - walWriter = new WALWriter(log); - - if (!mmap) - new IgniteThread(walWriter).start(); + fileHandleManager.resumeLogging(); currHnd = restoreWriteHandle(filePtr); @@ -717,16 +646,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (filePtr == null) currHnd.writeHeader(); - if (currHnd.serializer.version() != serializer.version()) { + if (currHnd.serializerVersion() != serializer.version()) { if (log.isInfoEnabled()) log.info("Record serializer version change detected, will start logging with a new WAL record " + "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + - ", oldVer=" + currHnd.serializer.version() + ']'); + ", oldVer=" + currHnd.serializerVersion() + ']'); rollOver(currHnd, null); } - currHnd.resume = false; + currHnd.finishResumeLogging(); if (mode == WALMode.BACKGROUND) { backgroundFlushSchedule = cctx.time().schedule(new Runnable() { @@ -803,9 +732,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl final FileWriteHandle handle = currentHandle(); try { - handle.buf.close(); - - rollOver(handle, null); + closeBufAndRollover(handle, null, RolloverType.NONE); } catch (IgniteCheckedException e) { U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); @@ -895,7 +822,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ) throws IgniteCheckedException { long idx = currWriteHandle.getSegmentId(); - currWriteHandle.buf.close(); + currWriteHandle.closeBuffer(); FileWriteHandle res = rollOver(currWriteHandle, rolloverType == RolloverType.NEXT_SEGMENT ? rec : null); @@ -907,28 +834,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException { - if (serializer == null || mode == WALMode.NONE) - return; - - FileWriteHandle cur = currentHandle(); - - // WAL manager was not started (client node). - if (cur == null) - return; - - FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); - - if (mode == LOG_ONLY) - cur.flushOrWait(filePtr); - - if (!explicitFsync && mode != WALMode.FSYNC) - return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required. - - // No need to sync if was rolled over. - if (filePtr != null && !cur.needFsync(filePtr)) - return; - - cur.fsync(filePtr); + fileHandleManager.flush(ptr, explicitFsync); } /** {@inheritDoc} */ @@ -1330,25 +1236,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + ", offset=" + off + ", ver=" + serVer + ']'); - SegmentedRingByteBuffer rbuf; - - if (mmap) { - MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); - - rbuf = new SegmentedRingByteBuffer(buf, metrics); - } - else - rbuf = new SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, metrics); - - if (lastReadPtr != null) - rbuf.init(lastReadPtr.fileOffset() + lastReadPtr.length()); - - FileWriteHandle hnd = new FileWriteHandle( - fileIO, - off + len, - true, - ser, - rbuf); + FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser); if (archiver0 != null) segmentAware.curAbsWalIdx(absIdx); @@ -1393,8 +1281,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); - SegmentedRingByteBuffer rbuf = null; - SegmentIO fileIO = null; FileWriteHandle hnd; @@ -1409,20 +1295,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (lsnr != null) lsnr.apply(fileIO); - if (mmap) { - MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); - rbuf = new SegmentedRingByteBuffer(buf, metrics); - } - else - rbuf = cur.buf.reset(); - - hnd = new FileWriteHandle( - fileIO, - 0, - false, - serializer, - rbuf); + hnd = fileHandleManager.nextHandle(fileIO, serializer); if (interrupted) Thread.currentThread().interrupt(); @@ -1444,12 +1318,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileIO = null; } - - if (rbuf != null) { - rbuf.free(); - - rbuf = null; - } } } @@ -2416,7 +2284,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ver Version. * @param compacted Compacted flag. */ - @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted, ByteBuffer buf) { + @NotNull public static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted, ByteBuffer buf) { // Write record type. buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); @@ -2452,33 +2320,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * */ - private abstract static class FileHandle { - /** I/O interface for read/write operations with file */ - SegmentIO fileIO; - - /** Segment idx corresponded to fileIo*/ - final long segmentIdx; - - /** - * @param fileIO I/O interface for read/write operations of FileHandle. - */ - private FileHandle(@NotNull SegmentIO fileIO) { - this.fileIO = fileIO; - segmentIdx = fileIO.getSegmentId(); - } - - /** - * @return Absolute WAL segment file index (incremental counter). - */ - public long getSegmentId(){ - return segmentIdx; - } - } - - /** - * - */ - public static class ReadFileHandle extends FileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle { + public static class ReadFileHandle extends AbstractFileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle { /** Entry serializer. */ RecordSerializer ser; @@ -2489,7 +2331,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final SegmentAware segmentAware; /** - * @param fileIO I/O interface for read/write operations of FileHandle. + * @param fileIO I/O interface for read/write operations of AbstractFileHandle. * @param ser Entry serializer. * @param in File input. * @param aware Segment aware. @@ -2542,459 +2384,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * File handle for one log segment. - */ - @SuppressWarnings("SignalWithoutCorrespondingAwait") - private class FileWriteHandle extends FileHandle { - /** */ - private final RecordSerializer serializer; - - /** Created on resume logging. */ - private volatile boolean resume; - - /** - * Position in current file after the end of last written record (incremented after file channel write - * operation) - */ - volatile long written; - - /** */ - private volatile long lastFsyncPos; - - /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */ - private final AtomicBoolean stop = new AtomicBoolean(false); - - /** */ - private final Lock lock = new ReentrantLock(); - - /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */ - private final Condition fsync = lock.newCondition(); - - /** - * Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link - * #fileIO}=<code>null</code> - */ - private final Condition nextSegment = lock.newCondition(); - - /** Buffer. */ - private final SegmentedRingByteBuffer buf; - - /** - * @param fileIO I/O file interface to use - * @param pos Position. - * @param resume Created on resume logging flag. - * @param serializer Serializer. - * @param buf Buffer. - * @throws IOException If failed. - */ - private FileWriteHandle( - SegmentIO fileIO, - long pos, - boolean resume, - RecordSerializer serializer, - SegmentedRingByteBuffer buf - ) throws IOException { - super(fileIO); - - assert serializer != null; - - if (!mmap) - fileIO.position(pos); - - this.serializer = serializer; - - written = pos; - lastFsyncPos = pos; - this.resume = resume; - this.buf = buf; - } - - /** - * Write serializer version to current handle. - */ - public void writeHeader() { - SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE); - - assert seg != null && seg.position() > 0; - - prepareSerializerVersionBuffer(getSegmentId(), serializerVersion(), false, seg.buffer()); - - seg.release(); - } - - /** - * @param rec Record to be added to write queue. - * @return Pointer or null if roll over to next segment is required or already started by other thread. - * @throws StorageException If failed. - * @throws IgniteCheckedException If failed. - */ - @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException { - assert rec.size() > 0 : rec; - - for (;;) { - checkNode(); - - SegmentedRingByteBuffer.WriteSegment seg; - - // Buffer can be in open state in case of resuming with different serializer version. - if (rec.type() == SWITCH_SEGMENT_RECORD && !currHnd.resume) - seg = buf.offerSafe(rec.size()); - else - seg = buf.offer(rec.size()); - - FileWALPointer ptr = null; - - if (seg != null) { - try { - int pos = (int)(seg.position() - rec.size()); - - ByteBuffer buf = seg.buffer(); - - if (buf == null) - return null; // Can not write to this segment, need to switch to the next one. - - ptr = new FileWALPointer(getSegmentId(), pos, rec.size()); - - rec.position(ptr); - - fillBuffer(buf, rec); - - if (mmap) { - // written field must grow only, but segment with greater position can be serialized - // earlier than segment with smaller position. - while (true) { - long written0 = written; - - if (seg.position() > written0) { - if (WRITTEN_UPD.compareAndSet(this, written0, seg.position())) - break; - } - else - break; - } - } - - return ptr; - } - finally { - seg.release(); - - if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord) - flushOrWait(ptr); - } - } - else - walWriter.flushAll(); - } - } - - /** - * Flush or wait for concurrent flush completion. - * - * @param ptr Pointer. - */ - private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { - if (ptr != null) { - // If requested obsolete file index, it must be already flushed by close. - if (ptr.index() != getSegmentId()) - return; - } - - flush(ptr); - } - - /** - * @param ptr Pointer. - */ - private void flush(FileWALPointer ptr) throws IgniteCheckedException { - if (ptr == null) { // Unconditional flush. - walWriter.flushAll(); - - return; - } - - assert ptr.index() == getSegmentId(); - - walWriter.flushBuffer(ptr.fileOffset()); - } - - /** - * @param buf Buffer. - * @param rec WAL record. - * @throws IgniteCheckedException If failed. - */ - private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException { - try { - serializer.writeRecord(rec, buf); - } - catch (RuntimeException e) { - throw new IllegalStateException("Failed to write record: " + rec, e); - } - } - - /** - * Non-blocking check if this pointer needs to be sync'ed. - * - * @param ptr WAL pointer to check. - * @return {@code False} if this pointer has been already sync'ed. - */ - private boolean needFsync(FileWALPointer ptr) { - // If index has changed, it means that the log was rolled over and already sync'ed. - // If requested position is smaller than last sync'ed, it also means all is good. - // If position is equal, then our record is the last not synced. - return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset(); - } - - /** - * @return Pointer to the end of the last written record (probably not fsync-ed). - */ - private FileWALPointer position() { - lock.lock(); - - try { - return new FileWALPointer(getSegmentId(), (int)written, 0); - } - finally { - lock.unlock(); - } - } - - /** - * @param ptr Pointer to sync. - * @throws StorageException If failed. - */ - private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException { - lock.lock(); - - try { - if (ptr != null) { - if (!needFsync(ptr)) - return; - - if (fsyncDelay > 0 && !stop.get()) { - // Delay fsync to collect as many updates as possible: trade latency for throughput. - U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS); - - if (!needFsync(ptr)) - return; - } - } - - flushOrWait(ptr); - - if (stop.get()) - return; - - long lastFsyncPos0 = lastFsyncPos; - long written0 = written; - - if (lastFsyncPos0 != written0) { - // Fsync position must be behind. - assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0; - - boolean metricsEnabled = metrics.metricsEnabled(); - - long start = metricsEnabled ? System.nanoTime() : 0; - - if (mmap) { - long pos = ptr == null ? -1 : ptr.fileOffset(); - - List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos); - - if (segs != null) { - assert segs.size() == 1; - - SegmentedRingByteBuffer.ReadSegment seg = segs.get(0); - - int off = seg.buffer().position(); - int len = seg.buffer().limit() - off; - - fsync((MappedByteBuffer)buf.buf, off, len); - - seg.release(); - } - } - else - walWriter.force(); - - lastFsyncPos = written; - - if (fsyncDelay > 0) - fsync.signalAll(); - - long end = metricsEnabled ? System.nanoTime() : 0; - - if (metricsEnabled) - metrics.onFsync(end - start); - } - } - finally { - lock.unlock(); - } - } - - /** - * @param buf Mapped byte buffer.. - * @param off Offset. - * @param len Length. - */ - private void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException { - try { - long mappedOff = (Long)mappingOffset.invoke(buf); - - assert mappedOff == 0 : mappedOff; - - long addr = (Long)mappingAddress.invoke(buf, mappedOff); - - long delta = (addr + off) % PAGE_SIZE; - - long alignedAddr = (addr + off) - delta; - - force0.invoke(buf, fd.get(buf), alignedAddr, len + delta); - } - catch (IllegalAccessException | InvocationTargetException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * @return {@code true} If this thread actually closed the segment. - * @throws IgniteCheckedException If failed. - * @throws StorageException If failed. - */ - private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException { - if (stop.compareAndSet(false, true)) { - lock.lock(); - - try { - flushOrWait(null); - - try { - RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) - .createSerializer(serializerVer); - - SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); - - int switchSegmentRecSize = backwardSerializer.size(segmentRecord); - - if (rollOver && written < (maxWalSegmentSize - switchSegmentRecSize)) { - segmentRecord.size(switchSegmentRecSize); - - WALPointer segRecPtr = addRecord(segmentRecord); - - if (segRecPtr != null) - fsync((FileWALPointer)segRecPtr); - } - - if (mmap) { - List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize); - - if (segs != null) { - assert segs.size() == 1; - - segs.get(0).release(); - } - } - - // Do the final fsync. - if (mode != WALMode.NONE) { - if (mmap) - ((MappedByteBuffer)buf.buf).force(); - else - fileIO.force(); - - lastFsyncPos = written; - } - - if (mmap) { - try { - fileIO.close(); - } - catch (IOException ignore) { - // No-op. - } - } - else { - walWriter.close(); - - if (!rollOver) - buf.free(); - } - } - catch (IOException e) { - throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e); - } - - if (log.isDebugEnabled()) - log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]"); - - return true; - } - finally { - if (mmap) - buf.free(); - - lock.unlock(); - } - } - else - return false; - } - - /** - * Signals next segment available to wake up other worker threads waiting for WAL to write - */ - private void signalNextAvailable() { - lock.lock(); - - try { - assert cctx.kernalContext().invalid() || - written == lastFsyncPos || mode != WALMode.FSYNC : - "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']'; - - fileIO = null; - - nextSegment.signalAll(); - } - finally { - lock.unlock(); - } - } - - /** - * - */ - private void awaitNext() { - lock.lock(); - - try { - while (fileIO != null) - U.awaitQuiet(nextSegment); - } - finally { - lock.unlock(); - } - } - - /** - * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. - */ - private String safePosition() { - FileIO io = fileIO; - - if (io == null) - return "null"; - - try { - return String.valueOf(io.position()); - } - catch (IOException e) { - return "{Failed to read channel position: " + e.getMessage() + '}'; - } - } - } - - /** * Iterator over WAL-log. */ private static class RecordsIterator extends AbstractWalRecordsIterator { @@ -3282,7 +2671,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle hnd = currentHandle(); try { - hnd.flush(null); + hnd.flushAll(); } catch (Exception e) { U.warn(log, "Failed to flush WAL record queue", e); @@ -3290,344 +2679,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * WAL writer worker. - */ - private class WALWriter extends GridWorker { - /** Unconditional flush. */ - private static final long UNCONDITIONAL_FLUSH = -1L; - - /** File close. */ - private static final long FILE_CLOSE = -2L; - - /** File force. */ - private static final long FILE_FORCE = -3L; - - /** Err. */ - private volatile Throwable err; - - //TODO: replace with GC free data structure. - /** Parked threads. */ - final Map<Thread, Long> waiters = new ConcurrentHashMap<>(); - - /** - * Default constructor. - * - * @param log Logger. - */ - WALWriter(IgniteLogger log) { - super(cctx.igniteInstanceName(), "wal-write-worker%" + cctx.igniteInstanceName(), log, - cctx.kernalContext().workersRegistry()); - } - - /** {@inheritDoc} */ - @Override protected void body() { - Throwable err = null; - - try { - while (!isCancelled()) { - onIdle(); - - while (waiters.isEmpty()) { - if (!isCancelled()) { - blockingSectionBegin(); - - try { - LockSupport.park(); - } - finally { - blockingSectionEnd(); - } - } - else { - unparkWaiters(Long.MAX_VALUE); - - return; - } - } - - Long pos = null; - - for (Long val : waiters.values()) { - if (val > Long.MIN_VALUE) - pos = val; - } - - updateHeartbeat(); - - if (pos == null) - continue; - else if (pos < UNCONDITIONAL_FLUSH) { - try { - assert pos == FILE_CLOSE || pos == FILE_FORCE : pos; - - if (pos == FILE_CLOSE) - currHnd.fileIO.close(); - else if (pos == FILE_FORCE) - currHnd.fileIO.force(); - } - catch (IOException e) { - log.error("Exception in WAL writer thread: ", e); - - err = e; - - unparkWaiters(Long.MAX_VALUE); - - return; - } - - unparkWaiters(pos); - } - - updateHeartbeat(); - - List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos); - - if (segs == null) { - unparkWaiters(pos); - - continue; - } - - for (int i = 0; i < segs.size(); i++) { - SegmentedRingByteBuffer.ReadSegment seg = segs.get(i); - - updateHeartbeat(); - - try { - writeBuffer(seg.position(), seg.buffer()); - } - catch (Throwable e) { - log.error("Exception in WAL writer thread:", e); - - err = e; - } - finally { - seg.release(); - - long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written; - - unparkWaiters(p); - } - } - } - } - catch (Throwable t) { - err = t; - } - finally { - unparkWaiters(Long.MAX_VALUE); - - if (err == null && !isCancelled) - err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); - - if (err instanceof OutOfMemoryError) - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); - else if (err != null) - cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); - } - } - - /** - * Shutdowns thread. - */ - public void shutdown() throws IgniteInterruptedCheckedException { - U.cancel(this); - - LockSupport.unpark(runner()); - - U.join(runner()); - } - - /** - * Unparks waiting threads. - * - * @param pos Pos. - */ - private void unparkWaiters(long pos) { - assert pos > Long.MIN_VALUE : pos; - - for (Map.Entry<Thread, Long> e : waiters.entrySet()) { - Long val = e.getValue(); - - if (val <= pos) { - if (val != Long.MIN_VALUE) - waiters.put(e.getKey(), Long.MIN_VALUE); - - LockSupport.unpark(e.getKey()); - } - } - } - - /** - * Forces all made changes to the file. - */ - void force() throws IgniteCheckedException { - flushBuffer(FILE_FORCE); - } - - /** - * Closes file. - */ - void close() throws IgniteCheckedException { - flushBuffer(FILE_CLOSE); - } - - /** - * Flushes all data from the buffer. - */ - void flushAll() throws IgniteCheckedException { - flushBuffer(UNCONDITIONAL_FLUSH); - } - - /** - * @param expPos Expected position. - */ - void flushBuffer(long expPos) throws IgniteCheckedException { - if (mmap) - return; - - Throwable err = walWriter.err; - - if (err != null) - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); - - if (expPos == UNCONDITIONAL_FLUSH) - expPos = (currentHandle().buf.tail()); - - Thread t = Thread.currentThread(); - - waiters.put(t, expPos); - - LockSupport.unpark(walWriter.runner()); - - while (true) { - Long val = waiters.get(t); - - assert val != null : "Only this thread can remove thread from waiters"; - - if (val == Long.MIN_VALUE) { - waiters.remove(t); - - Throwable walWriterError = walWriter.err; - - if (walWriterError != null) - throw new IgniteCheckedException("Flush buffer failed.", walWriterError); - - return; - } - else - LockSupport.park(); - } - } - - /** - * @param pos Position in file to start write from. May be checked against actual position to wait previous - * writes to complete - * @param buf Buffer to write to file - * @throws StorageException If failed. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("TooBroadScope") - private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException { - FileWriteHandle hdl = currentHandle(); - - assert hdl.fileIO != null : "Writing to a closed segment."; - - checkNode(); - - long lastLogged = U.currentTimeMillis(); - - long logBackoff = 2_000; - - // If we were too fast, need to wait previous writes to complete. - while (hdl.written != pos) { - assert hdl.written < pos : "written = " + hdl.written + ", pos = " + pos; // No one can write further than we are now. - - // Permutation occurred between blocks write operations. - // Order of acquiring lock is not the same as order of write. - long now = U.currentTimeMillis(); - - if (now - lastLogged >= logBackoff) { - if (logBackoff < 60 * 60_000) - logBackoff *= 2; - - U.warn(log, "Still waiting for a concurrent write to complete [written=" + hdl.written + - ", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos + ", stop=" + hdl.stop.get() + - ", actualPos=" + hdl.safePosition() + ']'); - - lastLogged = now; - } - - checkNode(); - } - - // Do the write. - int size = buf.remaining(); - - assert size > 0 : size; - - try { - assert hdl.written == hdl.fileIO.position(); - - hdl.written += hdl.fileIO.writeFully(buf); - - metrics.onWalBytesWritten(size); - - assert hdl.written == hdl.fileIO.position(); - } - catch (IOException e) { - StorageException se = new StorageException("Failed to write buffer.", e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); - - throw se; - } - } - } - - /** - * Syncs WAL segment file. - */ - private class WalSegmentSyncer extends GridWorker { - /** Sync timeout. */ - long syncTimeout; - - /** - * @param igniteInstanceName Ignite instance name. - * @param log Logger. - */ - public WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) { - super(igniteInstanceName, "wal-segment-syncer", log); - - syncTimeout = Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT, - DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - sleep(syncTimeout); - - try { - flush(null, true); - } - catch (IgniteCheckedException e) { - U.error(log, "Exception when flushing WAL.", e); - } - } - } - - /** Shutted down the worker. */ - private void shutdown() { - synchronized (this) { - U.cancel(this); - } - - U.join(this, log); - } - } - - /** * Scans provided folder for a WAL segment files * @param walFilesDir directory to scan * @return found WAL file descriptors