http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index 3d0b8b9..d7047cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -91,6 +91,10 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; @@ -260,6 +264,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** Factory to provide I/O interfaces for read/write operations with files */ private volatile FileIOFactory ioFactory; + /** Factory to provide I/O interfaces for read primitives with files */ + private final SegmentFileInputFactory segmentFileInputFactory; + /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ private static final AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> currentHndUpd = AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); @@ -342,6 +349,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda fsyncDelay = dsCfg.getWalFsyncDelayNanos(); alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); ioFactory = dsCfg.getFileIOFactory(); + segmentFileInputFactory = new SimpleSegmentFileInputFactory(); walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); evt = ctx.event(); @@ -769,7 +777,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda ioFactory, archiver, decompressor, - log + log, + segmentFileInputFactory ); } @@ -831,7 +840,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileWriteHandle cur = currentHnd; - return cur != null && cur.idx >= absIdx; + return cur != null && cur.getSegmentId() >= absIdx; } /** {@inheritDoc} */ @@ -1119,9 +1128,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (metrics.metricsEnabled()) metrics.onWallRollOver(); - FileWriteHandle next = initNextWriteHandle(cur.idx); + FileWriteHandle next = initNextWriteHandle(cur.getSegmentId()); - if (next.idx - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) + if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); boolean swapped = currentHndUpd.compareAndSet(this, hnd, next); @@ -1165,7 +1174,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda int len = lastReadPtr == null ? 0 : lastReadPtr.length(); try { - FileIO fileIO = ioFactory.create(curFile); + SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile)); try { int serVer = serializerVersion; @@ -1173,7 +1182,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda // If we have existing segment, try to read version from it. if (lastReadPtr != null) { try { - serVer = readSegmentHeader(fileIO, absIdx).getSerializerVersion(); + serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion(); } catch (SegmentEofException | EOFException ignore) { serVer = serializerVersion; @@ -1188,7 +1197,6 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileWriteHandle hnd = new FileWriteHandle( fileIO, - absIdx, offset + len, maxWalSegmentSize, ser); @@ -1238,11 +1246,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (log.isDebugEnabled()) log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); - FileIO fileIO = ioFactory.create(nextFile); + SegmentIO fileIO = new SegmentIO(curIdx + 1, ioFactory.create(nextFile)); FileWriteHandle hnd = new FileWriteHandle( fileIO, - curIdx + 1, 0, maxWalSegmentSize, serializer); @@ -1886,7 +1893,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); if (alreadyCompressed.length > 0) - lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx(); + lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx(); } /** @@ -2039,7 +2046,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda int segmentSerializerVer; try (FileIO fileIO = ioFactory.create(raw)) { - segmentSerializerVer = readSegmentHeader(fileIO, nextSegment).getSerializerVersion(); + segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion(); } try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { @@ -2326,18 +2333,20 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda */ private abstract static class FileHandle { /** I/O interface for read/write operations with file */ - protected FileIO fileIO; - - /** Absolute WAL segment file index (incremental counter) */ - protected final long idx; + protected SegmentIO fileIO; /** * @param fileIO I/O interface for read/write operations of FileHandle. - * @param idx Absolute WAL segment file index (incremental counter). */ - private FileHandle(FileIO fileIO, long idx) { + private FileHandle(SegmentIO fileIO) { this.fileIO = fileIO; - this.idx = idx; + } + + /** + * @return Current segment id. + */ + public long getSegmentId(){ + return fileIO.getSegmentId(); } } @@ -2359,17 +2368,15 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param fileIO I/O interface for read/write operations of FileHandle. - * @param idx Absolute WAL segment file index (incremental counter). * @param ser Entry serializer. * @param in File input. */ ReadFileHandle( - FileIO fileIO, - long idx, - RecordSerializer ser, - FileInput in + SegmentIO fileIO, + RecordSerializer ser, + FileInput in ) { - super(fileIO, idx); + super(fileIO); this.ser = ser; this.in = in; @@ -2389,7 +2396,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** {@inheritDoc} */ @Override public long idx() { - return idx; + return getSegmentId(); } /** {@inheritDoc} */ @@ -2456,20 +2463,18 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param fileIO I/O file interface to use - * @param idx Absolute WAL segment file index for easy access. * @param pos Position. * @param maxSegmentSize Max segment size. * @param serializer Serializer. * @throws IOException If failed. */ private FileWriteHandle( - FileIO fileIO, - long idx, + SegmentIO fileIO, long pos, long maxSegmentSize, RecordSerializer serializer ) throws IOException { - super(fileIO, idx); + super(fileIO); assert serializer != null; @@ -2478,7 +2483,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda this.maxSegmentSize = maxSegmentSize; this.serializer = serializer; - head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false)); + head.set(new FakeRecord(new FileWALPointer(fileIO.getSegmentId(), (int)pos, 0), false)); written = pos; lastFsyncPos = pos; } @@ -2494,15 +2499,15 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position(); - long updatedPosition = FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, + long updatedPosition = FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, getSegmentId(), serializer.version(), mode); written = updatedPosition; lastFsyncPos = updatedPosition; - head.set(new FakeRecord(new FileWALPointer(idx, (int)updatedPosition, 0), false)); + head.set(new FakeRecord(new FileWALPointer(getSegmentId(), (int)updatedPosition, 0), false)); } catch (IOException e) { - throw new IOException("Unable to write serializer version for segment " + idx, e); + throw new IOException("Unable to write serializer version for segment " + getSegmentId(), e); } } @@ -2558,7 +2563,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda rec.previous(h); FileWALPointer ptr = new FileWALPointer( - idx, + getSegmentId(), (int)nextPos, rec.size()); @@ -2588,7 +2593,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (ptr != null) { // If requested obsolete file index, it must be already flushed by close. - if (ptr.index() != idx) + if (ptr.index() != getSegmentId()) return; expWritten = ptr.fileOffset(); @@ -2647,7 +2652,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } } - assert ptr.index() == idx; + assert ptr.index() == getSegmentId(); for (; ; ) { WALRecord h = head.get(); @@ -2684,7 +2689,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda // Fail-fast before CAS. checkNode(); - if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop))) + if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(getSegmentId(), (int)nextPosition(expHead), 0), stop))) return false; if (expHead.chainSize() == 0) @@ -2782,7 +2787,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda // If index has changed, it means that the log was rolled over and already sync'ed. // If requested position is smaller than last sync'ed, it also means all is good. // If position is equal, then our record is the last not synced. - return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset(); + return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset(); } /** @@ -2792,7 +2797,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda lock.lock(); try { - return new FileWALPointer(idx, (int)written, 0); + return new FileWALPointer(getSegmentId(), (int)written, 0); } finally { lock.unlock(); @@ -2881,7 +2886,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); - segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize)); + segmentRecord.position(new FileWALPointer(getSegmentId(), (int)written, switchSegmentRecSize)); backwardSerializer.writeRecord(segmentRecord, buf); buf.rewind(); @@ -2904,11 +2909,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } } catch (IOException e) { - throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e); + throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e); } if (log.isDebugEnabled()) - log.debug("Closed WAL write handle [idx=" + idx + "]"); + log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]"); return true; } @@ -2943,7 +2948,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda fileIO.close(); } catch (IOException e) { - U.error(log, "Failed to close WAL file [idx=" + idx + ", fileIO=" + fileIO + "]", e); + U.error(log, "Failed to close WAL file [idx=" + getSegmentId() + ", fileIO=" + fileIO + "]", e); } } @@ -3160,7 +3165,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param serializerFactory Serializer factory. * @param archiver Archiver. * @param decompressor Decompressor. - *@param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. + * @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. + * @param segmentFileInputFactory Segment file input factory. */ private RecordsIterator( GridCacheSharedContext cctx, @@ -3173,13 +3179,17 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileIOFactory ioFactory, FileArchiver archiver, FileDecompressor decompressor, - IgniteLogger log + IgniteLogger log, + SegmentFileInputFactory segmentFileInputFactory ) throws IgniteCheckedException { - super(log, + super( + log, cctx, serializerFactory, ioFactory, - psCfg.getWalRecordIteratorBufferSize()); + psCfg.getWalRecordIteratorBufferSize(), + segmentFileInputFactory + ); this.walWorkDir = walWorkDir; this.walArchiveDir = walArchiveDir; this.psCfg = psCfg; @@ -3401,9 +3411,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** {@inheritDoc} */ - @Override protected AbstractReadFileHandle createReadFileHandle(FileIO fileIO, long idx, + @Override protected AbstractReadFileHandle createReadFileHandle(SegmentIO fileIO, RecordSerializer ser, FileInput in) { - return new ReadFileHandle(fileIO, idx, ser, in); + return new ReadFileHandle(fileIO, ser, in); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java deleted file mode 100644 index 81ecd41..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.processors.cache.persistence.wal; - -import org.apache.ignite.internal.IgniteInterruptedCheckedException; - -/** - * Next WAL segment archived monitor. Manages last archived index, allows to emulate archivation in no-archiver mode. - * Monitor which is notified each time WAL segment is archived. - */ -class SegmentArchivedMonitor { - /** - * Last archived file absolute index, 0-based. Write is quarded by {@code this}. Negative value indicates there are - * no segments archived. - */ - private volatile long lastAbsArchivedIdx = -1; - - /** - * @return Last archived segment absolute index. - */ - long lastArchivedAbsoluteIndex() { - return lastAbsArchivedIdx; - } - - /** - * @param lastAbsArchivedIdx new value of last archived segment index - */ - synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { - this.lastAbsArchivedIdx = lastAbsArchivedIdx; - - notifyAll(); - } - - /** - * Method will wait activation of particular WAL segment index. - * - * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true. - * @throws IgniteInterruptedCheckedException if interrupted. - */ - synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException { - while (lastArchivedAbsoluteIndex() < awaitIdx) { - try { - wait(2000); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java deleted file mode 100644 index 12c4b4f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.processors.cache.persistence.wal; - -import java.util.NavigableMap; -import java.util.TreeMap; - -/** - * Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. - */ -class SegmentReservationStorage { - /** - * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments - * which has index >= reserved segment index. Guarded by {@code this}. - */ - private NavigableMap<Long, Integer> reserved = new TreeMap<>(); - - /** - * @param absIdx Index for reservation. - */ - synchronized void reserve(long absIdx) { - reserved.merge(absIdx, 1, (a, b) -> a + b); - } - - /** - * Checks if segment is currently reserved (protected from deletion during WAL cleanup). - * @param absIdx Index for check reservation. - * @return {@code True} if index is reserved. - */ - synchronized boolean reserved(long absIdx) { - return reserved.floorKey(absIdx) != null; - } - - /** - * @param absIdx Reserved index. - */ - synchronized void release(long absIdx) { - Integer cur = reserved.get(absIdx); - - assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; - - if (cur == 1) - reserved.remove(absIdx); - else - reserved.put(absIdx, cur - 1); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java new file mode 100644 index 0000000..2d47e9d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.File; +import java.io.FileNotFoundException; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; + +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; + +/** + * Class for manage of segment file location. + */ +public class SegmentRouter { + /** */ + public static final String ZIP_SUFFIX = ".zip"; + /** */ + private File walWorkDir; + + /** WAL archive directory (including consistent ID as subfolder) */ + private File walArchiveDir; + + /** Holder of actual information of latest manipulation on WAL segments. */ + private SegmentAware segmentAware; + + /** */ + private DataStorageConfiguration dsCfg; + + /** + * @param walWorkDir WAL work directory. + * @param walArchiveDir WAL archive directory. + * @param segmentAware Holder of actual information of latest manipulation on WAL segments. + * @param dsCfg Data storage configuration. + */ + public SegmentRouter( + File walWorkDir, + File walArchiveDir, + SegmentAware segmentAware, + DataStorageConfiguration dsCfg) { + this.walWorkDir = walWorkDir; + this.walArchiveDir = walArchiveDir; + this.segmentAware = segmentAware; + this.dsCfg = dsCfg; + } + + /** + * Find file which represent given segment. + * + * @param segmentId Segment for searching. + * @return Actual file description. + * @throws FileNotFoundException If file does not exist. + */ + public FileDescriptor findSegment(long segmentId) throws FileNotFoundException { + FileDescriptor fd; + + if (segmentAware.lastArchivedAbsoluteIndex() >= segmentId) + fd = new FileDescriptor(new File(walArchiveDir, fileName(segmentId))); + else + fd = new FileDescriptor(new File(walWorkDir, fileName(segmentId % dsCfg.getWalSegments())), segmentId); + + if (!fd.file().exists()) { + FileDescriptor zipFile = new FileDescriptor(new File(walArchiveDir, fileName(fd.idx()) + ZIP_SUFFIX)); + + if (!zipFile.file().exists()) { + throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + + "[segmentIdx=" + fd.idx() + "]"); + } + + fd = zipFile; + } + + return fd; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java index a42eb89..8d1445c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -25,8 +25,10 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.RecordTypes; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; @@ -71,7 +73,7 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera File archiveDir, CIX1<WALRecord> advanceC ) throws IgniteCheckedException { - super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize); + super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize, new SimpleSegmentFileInputFactory()); curWalSegmIdx = archivedSegIdx; this.archiveDir = archiveDir; @@ -121,9 +123,10 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera } /** {@inheritDoc} */ - @Override protected AbstractReadFileHandle createReadFileHandle(FileIO fileIO, long idx, + @Override protected AbstractReadFileHandle createReadFileHandle( + SegmentIO fileIO, RecordSerializer ser, FileInput in) { - return new FileWriteAheadLogManager.ReadFileHandle(fileIO, idx, ser, in); + return new FileWriteAheadLogManager.ReadFileHandle(fileIO, ser, in, null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java new file mode 100644 index 0000000..1ed607e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Manages last archived index, allows to emulate archivation in no-archiver mode. Monitor which is notified each time + * WAL segment is archived. + * + * Class for inner usage. + */ +class SegmentArchivedStorage extends SegmentObservable { + /** Segment lock storage: Protects WAL work segments from moving. */ + private final SegmentLockStorage segmentLockStorage; + /** Flag of interrupt waiting on this object. */ + private volatile boolean interrupted; + /** + * Last archived file absolute index, 0-based. Write is quarded by {@code this}. Negative value indicates there are + * no segments archived. + */ + private volatile long lastAbsArchivedIdx = -1; + + /** + * @param segmentLockStorage Protects WAL work segments from moving. + */ + private SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) { + this.segmentLockStorage = segmentLockStorage; + } + + /** + * @param segmentLockStorage Protects WAL work segments from moving. + */ + static SegmentArchivedStorage buildArchivedStorage(SegmentLockStorage segmentLockStorage) { + SegmentArchivedStorage archivedStorage = new SegmentArchivedStorage(segmentLockStorage); + + segmentLockStorage.addObserver(archivedStorage::onSegmentUnlocked); + + return archivedStorage; + } + + /** + * @return Last archived segment absolute index. + */ + long lastArchivedAbsoluteIndex() { + return lastAbsArchivedIdx; + } + + /** + * @param lastAbsArchivedIdx New value of last archived segment index. + */ + synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { + this.lastAbsArchivedIdx = lastAbsArchivedIdx; + + notifyAll(); + + notifyObservers(lastAbsArchivedIdx); + } + + /** + * Method will wait activation of particular WAL segment index. + * + * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true. + * @throws IgniteInterruptedCheckedException if interrupted. + */ + synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException { + while (lastArchivedAbsoluteIndex() < awaitIdx && !interrupted) { + try { + wait(2000); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + + checkInterrupted(); + } + + /** + * Mark segment as moved to archive under lock. + * + * @param toArchive Segment which was should be moved to archive. + * @throws IgniteInterruptedCheckedException if interrupted during waiting. + */ + synchronized void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException { + try { + while (segmentLockStorage.locked(toArchive) && !interrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + //Ignore interrupted flag and force set new value. - legacy logic. + //checkInterrupted(); + + setLastArchivedAbsoluteIndex(toArchive); + } + + /** + * Interrupt waiting on this object. + */ + synchronized void interrupt() { + interrupted = true; + + notifyAll(); + } + + /** + * Check for interrupt flag was set. + */ + private void checkInterrupted() throws IgniteInterruptedCheckedException { + if (interrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx"); + } + + /** + * Callback for waking up waiters of this object when unlocked happened. + */ + private synchronized void onSegmentUnlocked(long segmentId) { + notifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java new file mode 100644 index 0000000..3379b74 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage; +import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCompressStorage.buildCompressStorage; +import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCurrentStateStorage.buildCurrentStateStorage; + +/** + * Holder of actual information of latest manipulation on WAL segments. + */ +public class SegmentAware { + /** Latest truncated segment. */ + private volatile long lastTruncatedArchiveIdx = -1L; + /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */ + private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage(); + /** Lock on segment protects from archiving segment. */ + private final SegmentLockStorage segmentLockStorage = new SegmentLockStorage(); + /** Manages last archived index, emulates archivation in no-archiver mode. */ + private final SegmentArchivedStorage segmentArchivedStorage = buildArchivedStorage(segmentLockStorage); + /** Storage of actual information about current index of compressed segments. */ + private final SegmentCompressStorage segmentCompressStorage = buildCompressStorage(segmentArchivedStorage); + /** Storage of absolute current segment index. */ + private final SegmentCurrentStateStorage segmentCurrStateStorage; + + /** + * @param walSegmentsCnt Total WAL segments count. + */ + public SegmentAware(int walSegmentsCnt) { + segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage); + } + + /** + * Waiting until current WAL index will be greater or equal than given one. + * + * @param absSegIdx Target WAL index. + */ + public void awaitSegment(long absSegIdx) throws IgniteInterruptedCheckedException { + segmentCurrStateStorage.awaitSegment(absSegIdx); + } + + /** + * Calculate next segment index or wait if needed. + * + * @return Next absolute segment index. + */ + public long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException { + return segmentCurrStateStorage.nextAbsoluteSegmentIndex(); + } + + /** + * @return Current WAL index. + */ + public long curAbsWalIdx() { + return segmentCurrStateStorage.curAbsWalIdx(); + } + + /** + * Waiting until archivation of next segment will be allowed. + */ + public long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException { + return segmentCurrStateStorage.waitNextSegmentForArchivation(); + } + + /** + * Mark segment as moved to archive under lock. + * + * @param toArchive Segment which was should be moved to archive. + * @throws IgniteInterruptedCheckedException if interrupted during waiting. + */ + public void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException { + segmentArchivedStorage.markAsMovedToArchive(toArchive); + } + + /** + * Method will wait activation of particular WAL segment index. + * + * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true. + * @throws IgniteInterruptedCheckedException if interrupted. + */ + public void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException { + segmentArchivedStorage.awaitSegmentArchived(awaitIdx); + } + + /** + * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if + * there's no segment to archive right now. + */ + public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException { + return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), lastTruncatedArchiveIdx + 1); + } + + /** + * Force set last compressed segment. + * + * @param lastCompressedIdx Segment which was last compressed. + */ + public void lastCompressedIdx(long lastCompressedIdx) { + segmentCompressStorage.lastCompressedIdx(lastCompressedIdx); + } + + /** + * @return Last compressed segment. + */ + public long lastCompressedIdx() { + return segmentCompressStorage.lastCompressedIdx(); + } + + /** + * Update current WAL index. + * + * @param curAbsWalIdx New current WAL index. + */ + public void curAbsWalIdx(long curAbsWalIdx) { + segmentCurrStateStorage.curAbsWalIdx(curAbsWalIdx); + } + + /** + * @param lastTruncatedArchiveIdx Last truncated segment; + */ + public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) { + this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx; + } + + /** + * @return Last truncated segment. + */ + public long lastTruncatedArchiveIdx() { + return lastTruncatedArchiveIdx; + } + + /** + * @param lastAbsArchivedIdx New value of last archived segment index. + */ + public void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { + segmentArchivedStorage.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); + } + + /** + * @return Last archived segment absolute index. + */ + public long lastArchivedAbsoluteIndex() { + return segmentArchivedStorage.lastArchivedAbsoluteIndex(); + } + + /** + * @param absIdx Index for reservation. + */ + public void reserve(long absIdx) { + reservationStorage.reserve(absIdx); + } + + /** + * Checks if segment is currently reserved (protected from deletion during WAL cleanup). + * + * @param absIdx Index for check reservation. + * @return {@code True} if index is reserved. + */ + public boolean reserved(long absIdx) { + return reservationStorage.reserved(absIdx); + } + + /** + * @param absIdx Reserved index. + */ + public void release(long absIdx) { + reservationStorage.release(absIdx); + } + + /** + * Check if WAL segment locked (protected from move to archive) + * + * @param absIdx Index for check reservation. + * @return {@code True} if index is locked. + */ + public boolean locked(long absIdx) { + return segmentLockStorage.locked(absIdx); + } + + /** + * @param absIdx Segment absolute index. + * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release + * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> + */ + public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { + return lastArchivedAbsoluteIndex() >= absIdx || segmentLockStorage.lockWorkSegment(absIdx); + } + + /** + * @param absIdx Segment absolute index. + */ + public void releaseWorkSegment(long absIdx) { + segmentLockStorage.releaseWorkSegment(absIdx); + } + + /** + * Interrupt waiting on related objects. + */ + public void interrupt() { + segmentArchivedStorage.interrupt(); + + segmentCompressStorage.interrupt(); + + segmentCurrStateStorage.interrupt(); + } + + /** + * Interrupt waiting on related objects. + */ + public void forceInterrupt() { + segmentArchivedStorage.interrupt(); + + segmentCompressStorage.interrupt(); + + segmentCurrStateStorage.forceInterrupt(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java new file mode 100644 index 0000000..30c9a2d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Storage of actual information about current index of compressed segments. + */ +public class SegmentCompressStorage { + /** Flag of interrupt waiting on this object. */ + private volatile boolean interrupted; + /** Manages last archived index, emulates archivation in no-archiver mode. */ + private final SegmentArchivedStorage segmentArchivedStorage; + /** Last successfully compressed segment. */ + private volatile long lastCompressedIdx = -1L; + + /** + * @param segmentArchivedStorage Storage of last archived segment. + */ + private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage) { + this.segmentArchivedStorage = segmentArchivedStorage; + + this.segmentArchivedStorage.addObserver(this::onSegmentArchived); + } + + /** + * @param segmentArchivedStorage Storage of last archived segment. + */ + static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage) { + SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage); + + segmentArchivedStorage.addObserver(storage::onSegmentArchived); + + return storage; + } + + /** + * Force set last compressed segment. + * + * @param lastCompressedIdx Segment which was last compressed. + */ + void lastCompressedIdx(long lastCompressedIdx) { + this.lastCompressedIdx = lastCompressedIdx; + } + + /** + * @return Last compressed segment. + */ + long lastCompressedIdx() { + return lastCompressedIdx; + } + + /** + * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if + * there's no segment to archive right now. + */ + synchronized long nextSegmentToCompressOrWait() throws IgniteInterruptedCheckedException { + long segmentToCompress = lastCompressedIdx + 1; + + try { + while ( + segmentToCompress > segmentArchivedStorage.lastArchivedAbsoluteIndex() + && !interrupted + ) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + checkInterrupted(); + + return segmentToCompress; + } + + /** + * Interrupt waiting on this object. + */ + synchronized void interrupt() { + interrupted = true; + + notifyAll(); + } + + /** + * Check for interrupt flag was set. + */ + private void checkInterrupted() throws IgniteInterruptedCheckedException { + if (interrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of change compressed idx"); + } + + /** + * Callback for waking up compressor when new segment is archived. + */ + private synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + notifyAll(); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java new file mode 100644 index 0000000..5761ef9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Storage of absolute current segment index. + */ +class SegmentCurrentStateStorage { + /** Flag of interrupt of waiting on this object. */ + private volatile boolean interrupted; + /** Flag of force interrupt of waiting on this object. Needed for uninterrupted waiters. */ + private volatile boolean forceInterrupted; + /** Total WAL segments count. */ + private final int walSegmentsCnt; + /** Manages last archived index, emulates archivation in no-archiver mode. */ + private final SegmentArchivedStorage segmentArchivedStorage; + /** + * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during rollover. + * Also may be directly set if WAL is resuming logging after start. + */ + private volatile long curAbsWalIdx = -1; + + /** + * @param walSegmentsCnt Total WAL segments count. + * @param segmentArchivedStorage Last archived segment storage. + */ + private SegmentCurrentStateStorage(int walSegmentsCnt, SegmentArchivedStorage segmentArchivedStorage) { + this.walSegmentsCnt = walSegmentsCnt; + this.segmentArchivedStorage = segmentArchivedStorage; + } + + /** + * @param walSegmentsCnt Total WAL segments count. + * @param segmentArchivedStorage Last archived segment storage. + */ + static SegmentCurrentStateStorage buildCurrentStateStorage( + int walSegmentsCnt, + SegmentArchivedStorage segmentArchivedStorage + ) { + + SegmentCurrentStateStorage currStorage = new SegmentCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage); + + segmentArchivedStorage.addObserver(currStorage::onSegmentArchived); + + return currStorage; + } + + /** + * Waiting until current WAL index will be greater or equal than given one. + * + * @param absSegIdx Target WAL index. + */ + synchronized void awaitSegment(long absSegIdx) throws IgniteInterruptedCheckedException { + try { + while (curAbsWalIdx < absSegIdx && !interrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + checkInterrupted(); + } + + /** + * Waiting until archivation of next segment will be allowed. + */ + synchronized long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException { + long lastArchivedSegment = segmentArchivedStorage.lastArchivedAbsoluteIndex(); + + //We can archive segment if it less than current work segment so for archivate lastArchiveSegment + 1 + // we should be ensure that currentWorkSegment = lastArchiveSegment + 2 + awaitSegment(lastArchivedSegment + 2); + + return lastArchivedSegment + 1; + } + + /** + * Calculate next segment index or wait if needed. Uninterrupted waiting. - for force interrupt used + * forceInterrupted flag. + * + * @return Next absolute segment index. + */ + synchronized long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException { + curAbsWalIdx++; + + notifyAll(); + + try { + while (curAbsWalIdx - segmentArchivedStorage.lastArchivedAbsoluteIndex() > walSegmentsCnt && !forceInterrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + if (forceInterrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx"); + + return curAbsWalIdx; + } + + /** + * Update current WAL index. + * + * @param curAbsWalIdx New current WAL index. + */ + synchronized void curAbsWalIdx(long curAbsWalIdx) { + this.curAbsWalIdx = curAbsWalIdx; + + notifyAll(); + } + + /** + * @return Current WAL index. + */ + long curAbsWalIdx() { + return this.curAbsWalIdx; + } + + /** + * Interrupt waiting on this object. + */ + synchronized void interrupt() { + interrupted = true; + + notifyAll(); + } + + /** + * Interrupt waiting on this object. + */ + synchronized void forceInterrupt() { + interrupted = true; + forceInterrupted = true; + + notifyAll(); + } + + /** + * Callback for waking up awaiting when new segment is archived. + */ + private synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + notifyAll(); + } + + /** + * Check for interrupt flag was set. + */ + private void checkInterrupted() throws IgniteInterruptedCheckedException { + if (interrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of change current idx"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java new file mode 100644 index 0000000..2e145e7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; + +/** + * Lock on segment protects from archiving segment. + */ +public class SegmentLockStorage extends SegmentObservable { + /** + * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from + * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. + */ + private Map<Long, Integer> locked = new HashMap<>(); + + /** + * Check if WAL segment locked (protected from move to archive) + * + * @param absIdx Index for check reservation. + * @return {@code True} if index is locked. + */ + public synchronized boolean locked(long absIdx) { + return locked.containsKey(absIdx); + } + + /** + * @param absIdx Segment absolute index. + * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release + * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> + */ + @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") + synchronized boolean lockWorkSegment(long absIdx) { + Integer cur = locked.get(absIdx); + + cur = cur == null ? 1 : cur + 1; + + locked.put(absIdx, cur); + + return false; + } + + /** + * @param absIdx Segment absolute index. + */ + @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") + synchronized void releaseWorkSegment(long absIdx) { + Integer cur = locked.get(absIdx); + + assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; + + if (cur == 1) + locked.remove(absIdx); + else + locked.put(absIdx, cur - 1); + + notifyObservers(absIdx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java new file mode 100644 index 0000000..ba5ad30 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * Implementation of observer-observable pattern. For handling specific changes of segment. + */ +public abstract class SegmentObservable { + /** Observers for handle changes of archived index. */ + private final List<Consumer<Long>> observers = new ArrayList<>(); + + /** + * @param observer Observer for notification about segment's changes. + */ + synchronized void addObserver(Consumer<Long> observer) { + observers.add(observer); + } + + /** + * Notify observers about changes. + * + * @param segmentId Segment which was been changed. + */ + synchronized void notifyObservers(long segmentId) { + observers.forEach(observer -> observer.accept(segmentId)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java new file mode 100644 index 0000000..50c2bbf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. + */ +class SegmentReservationStorage { + /** + * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has + * index >= reserved segment index. Guarded by {@code this}. + */ + private NavigableMap<Long, Integer> reserved = new TreeMap<>(); + + /** + * @param absIdx Index for reservation. + */ + synchronized void reserve(long absIdx) { + reserved.merge(absIdx, 1, (a, b) -> a + b); + } + + /** + * Checks if segment is currently reserved (protected from deletion during WAL cleanup). + * + * @param absIdx Index for check reservation. + * @return {@code True} if index is reserved. + */ + synchronized boolean reserved(long absIdx) { + return reserved.floorKey(absIdx) != null; + } + + /** + * @param absIdx Reserved index. + */ + synchronized void release(long absIdx) { + Integer cur = reserved.get(absIdx); + + assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; + + if (cur == 1) + reserved.remove(absIdx); + else + reserved.put(absIdx, cur - 1); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java new file mode 100644 index 0000000..d19d17b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; +import org.jetbrains.annotations.NotNull; + +/** + * File input, backed by byte buffer file input. + * This class allows to read data by chunks from file and then read primitives. + */ +public interface FileInput extends ByteBufferBackedDataInput { + /** + * File I/O. + */ + FileIO io(); + + /** + * @param pos Position in bytes from file begin. + */ + void seek(long pos) throws IOException; + + /** + * @return Position in the stream. + */ + long position(); + + /** + * @param skipCheck If CRC check should be skipped. + * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one + */ + SimpleFileInput.Crc32CheckingFileInput startRead(boolean skipCheck); + + /** + * Checking of CRC32. + */ + public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, AutoCloseable { + /** */ + private final PureJavaCrc32 crc32 = new PureJavaCrc32(); + + /** Last calc position. */ + private int lastCalcPosition; + + /** Skip crc check. */ + private boolean skipCheck; + + /** */ + private FileInput delegate; + + /** + */ + public Crc32CheckingFileInput(FileInput delegate, boolean skipCheck) { + this.delegate = delegate; + this.lastCalcPosition = delegate.buffer().position(); + this.skipCheck = skipCheck; + } + + /** {@inheritDoc} */ + @Override public void ensure(int requested) throws IOException { + int available = buffer().remaining(); + + if (available >= requested) + return; + + updateCrc(); + + delegate.ensure(requested); + + lastCalcPosition = 0; + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + updateCrc(); + + int val = crc32.getValue(); + + int writtenCrc = this.readInt(); + + if ((val ^ writtenCrc) != 0 && !skipCheck) { + // If it last message we will skip it (EOF will be thrown). + ensure(5); + + throw new IgniteDataIntegrityViolationException( + "val: " + val + " writtenCrc: " + writtenCrc + ); + } + } + + /** + * + */ + private void updateCrc() { + if (skipCheck) + return; + + int oldPos = buffer().position(); + + buffer().position(lastCalcPosition); + + crc32.update(delegate.buffer(), oldPos - lastCalcPosition); + + lastCalcPosition = oldPos; + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + ensure(n); + + int skipped = Math.min(buffer().remaining(), n); + + buffer().position(buffer().position() + skipped); + + return skipped; + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(@NotNull byte[] b) throws IOException { + ensure(b.length); + + buffer().get(b); + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { + ensure(len); + + buffer().get(b, off, len); + } + + /** + * {@inheritDoc} + */ + @Override public boolean readBoolean() throws IOException { + return readByte() == 1; + } + + /** + * {@inheritDoc} + */ + @Override public byte readByte() throws IOException { + ensure(1); + + return buffer().get(); + } + + /** + * {@inheritDoc} + */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xFF; + } + + /** + * {@inheritDoc} + */ + @Override public short readShort() throws IOException { + ensure(2); + + return buffer().getShort(); + } + + /** + * {@inheritDoc} + */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xFFFF; + } + + /** + * {@inheritDoc} + */ + @Override public char readChar() throws IOException { + ensure(2); + + return buffer().getChar(); + } + + /** + * {@inheritDoc} + */ + @Override public int readInt() throws IOException { + ensure(4); + + return buffer().getInt(); + } + + /** + * {@inheritDoc} + */ + @Override public long readLong() throws IOException { + ensure(8); + + return buffer().getLong(); + } + + /** + * {@inheritDoc} + */ + @Override public float readFloat() throws IOException { + ensure(4); + + return buffer().getFloat(); + } + + /** + * {@inheritDoc} + */ + @Override public double readDouble() throws IOException { + ensure(8); + + return buffer().getDouble(); + } + + /** + * {@inheritDoc} + */ + @Override public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + @Override public String readUTF() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer buffer() { + return delegate.buffer(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java new file mode 100644 index 0000000..6e5a7a4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.IOException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; + +/** + * File input, backed by byte buffer file input. This class allows to read data by chunks from file and then read + * primitives. + * + * This implementation locks segment only for reading to buffer and also can switch reading segment from work directory + * to archive directory if needed. + */ +final class LockedReadFileInput extends SimpleFileInput { + /** Segment for read. */ + private final long segmentId; + /** Holder of actual information of latest manipulation on WAL segments. */ + private final SegmentAware segmentAware; + /** Factory of file I/O for segment. */ + private final SegmentIoFactory fileIOFactory; + /** Last read was from archive or not. */ + private boolean isLastReadFromArchive; + + /** + * @param buf Buffer for reading blocks of data into. + * @param initFileIo Initial File I/O for reading. + * @param segmentAware Holder of actual information of latest manipulation on WAL segments. + * @param segmentIOFactory Factory of file I/O for segment. + * @throws IOException if initFileIo would be fail during reading. + */ + LockedReadFileInput( + ByteBufferExpander buf, + SegmentIO initFileIo, + SegmentAware segmentAware, + SegmentIoFactory segmentIOFactory + ) throws IOException { + super(initFileIo, buf); + this.segmentAware = segmentAware; + this.fileIOFactory = segmentIOFactory; + this.segmentId = initFileIo.getSegmentId(); + isLastReadFromArchive = segmentAware.lastArchivedAbsoluteIndex() >= initFileIo.getSegmentId(); + } + + /** {@inheritDoc} */ + @Override public void ensure(int requested) throws IOException { + int available = buffer().remaining(); + + if (available >= requested) + return; + + boolean readArchive = segmentAware.checkCanReadArchiveOrReserveWorkSegment(segmentId); + try { + if (readArchive && !isLastReadFromArchive) { + isLastReadFromArchive = true; + + refreshIO(); + } + + super.ensure(requested); + } + finally { + if (!readArchive) + segmentAware.releaseWorkSegment(segmentId); + } + } + + /** + * Refresh current file io. + * + * @throws IOException if old fileIO is fail during reading or new file is fail during creation. + */ + private void refreshIO() throws IOException { + FileIO io = fileIOFactory.build(segmentId); + + io.position(io().position()); + + io().close(); + + this.io = io; + } + + /** + * Resolving fileIo for segment. + */ + interface SegmentIoFactory { + /** + * @param segmentId Segment for IO action. + * @return {@link FileIO}. + * @throws IOException if creation would be fail. + */ + FileIO build(long segmentId) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java new file mode 100644 index 0000000..f3cdda7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.IOException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; +import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentRouter; + +/** + * Implementation of factory to provide I/O interfaces for read primitives with files. + * + * Creating {@link FileInput} with ability locked segment during reading. + */ +public class LockedSegmentFileInputFactory implements SegmentFileInputFactory { + /** Holder of actual information of latest manipulation on WAL segments. */ + private final SegmentAware segmentAware; + /** Manager of segment location. */ + private final SegmentRouter segmentRouter; + /** {@link FileIO} factory definition.*/ + private final FileIOFactory fileIOFactory; + + /** + * @param segmentAware Holder of actual information of latest manipulation on WAL segments. + * @param segmentRouter Manager of segment location. + * @param fileIOFactory {@link FileIO} factory definition. + */ + public LockedSegmentFileInputFactory( + SegmentAware segmentAware, + SegmentRouter segmentRouter, + FileIOFactory fileIOFactory) { + this.segmentAware = segmentAware; + this.segmentRouter = segmentRouter; + this.fileIOFactory = fileIOFactory; + } + + /** {@inheritDoc} */ + @Override public FileInput createFileInput(SegmentIO segmentIO, ByteBufferExpander buf) throws IOException { + return new LockedReadFileInput( + buf, + segmentIO, + segmentAware, + id -> { + FileDescriptor segment = segmentRouter.findSegment(id); + + return segment.toIO(fileIOFactory); + } + ); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java new file mode 100644 index 0000000..b688f90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.IOException; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; + +/** + * Factory to provide I/O interfaces for read primitives with files. + */ +public interface SegmentFileInputFactory { + /** + * @param segmentIO FileIO of segment for reading. + * @param buf ByteBuffer wrapper for dynamically expand buffer size. + * @return Instance of {@link FileInput}. + * @throws IOException If have some trouble with I/O. + */ + FileInput createFileInput(SegmentIO segmentIO, ByteBufferExpander buf) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java new file mode 100644 index 0000000..d0a6445 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; + +/** + * Implementation of {@link FileIO} specified for WAL segment file. + */ +public class SegmentIO extends FileIODecorator { + /** Segment id. */ + private final long segmentId; + + /** + * @param id Segment id. + * @param delegate File I/O delegate + */ + public SegmentIO(long id, FileIO delegate) { + super(delegate); + segmentId = id; + } + + /** + * @return Segment id. + */ + public long getSegmentId() { + return segmentId; + } +}
