http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 2045c35..2e97fd5 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -22,34 +22,22 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; -import com.codahale.metrics.Timer; - import org.cliffc.high_scale_lib.NonBlockingHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import com.codahale.metrics.Timer; +import org.apache.cassandra.config.*; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.commitlog.CommitLog.Configuration; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -66,6 +54,14 @@ public abstract class CommitLogSegment private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); private final static long idBase; + + private CDCState cdcState = CDCState.PERMITTED; + public enum CDCState { + PERMITTED, + FORBIDDEN, + CONTAINS + } + private final static AtomicInteger nextId = new AtomicInteger(1); private static long replayLimitId; static @@ -115,18 +111,20 @@ public abstract class CommitLogSegment final FileChannel channel; final int fd; + protected final AbstractCommitLogSegmentManager manager; + ByteBuffer buffer; private volatile boolean headerWritten; final CommitLog commitLog; public final CommitLogDescriptor descriptor; - static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose) + static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) { Configuration config = commitLog.configuration; - CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose) - : config.useCompression() ? new CompressedSegment(commitLog, onClose) - : new MemoryMappedSegment(commitLog); + CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose) + : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose) + : new MemoryMappedSegment(commitLog, manager); segment.writeLogHeader(); return segment; } @@ -151,14 +149,16 @@ public abstract class CommitLogSegment /** * Constructs a new segment file. */ - CommitLogSegment(CommitLog commitLog) + CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { this.commitLog = commitLog; + this.manager = manager; + id = getNextId(); descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass(), commitLog.configuration.getEncryptionContext()); - logFile = new File(commitLog.location, descriptor.fileName()); + logFile = new File(manager.storageDirectory, descriptor.fileName()); try { @@ -369,22 +369,11 @@ public abstract class CommitLogSegment } /** - * Completely discards a segment file by deleting it. (Potentially blocking operation) - */ - void discard(boolean deleteFile) - { - close(); - if (deleteFile) - FileUtils.deleteWithConfirm(logFile); - commitLog.allocator.addSize(-onDiskSize()); - } - - /** - * @return the current ReplayPosition for this log segment + * @return the current CommitLogPosition for this log segment */ - public ReplayPosition getContext() + public CommitLogPosition getCurrentCommitLogPosition() { - return new ReplayPosition(id, allocatePosition.get()); + return new CommitLogPosition(id, allocatePosition.get()); } /** @@ -474,13 +463,13 @@ public abstract class CommitLogSegment * @param cfId the column family ID that is now clean * @param context the optional clean offset */ - public synchronized void markClean(UUID cfId, ReplayPosition context) + public synchronized void markClean(UUID cfId, CommitLogPosition context) { if (!cfDirty.containsKey(cfId)) return; - if (context.segment == id) + if (context.segmentId == id) markClean(cfId, context.position); - else if (context.segment > id) + else if (context.segmentId > id) markClean(cfId, Integer.MAX_VALUE); } @@ -565,14 +554,14 @@ public abstract class CommitLogSegment } /** - * Check to see if a certain ReplayPosition is contained by this segment file. + * Check to see if a certain CommitLogPosition is contained by this segment file. * - * @param context the replay position to be checked - * @return true if the replay position is contained by this segment file. + * @param context the commit log segment position to be checked + * @return true if the commit log segment position is contained by this segment file. */ - public boolean contains(ReplayPosition context) + public boolean contains(CommitLogPosition context) { - return context.segment == id; + return context.segmentId == id; } // For debugging, not fast @@ -610,12 +599,37 @@ public abstract class CommitLogSegment } } + public CDCState getCDCState() + { + return cdcState; + } + + /** + * Change the current cdcState on this CommitLogSegment. There are some restrictions on state transitions and this + * method is idempotent. + */ + public void setCDCState(CDCState newState) + { + if (newState == cdcState) + return; + + // Also synchronized in CDCSizeTracker.processNewSegment and .processDiscardedSegment + synchronized(this) + { + if (cdcState == CDCState.CONTAINS && newState != CDCState.CONTAINS) + throw new IllegalArgumentException("Cannot transition from CONTAINS to any other state."); + + if (cdcState == CDCState.FORBIDDEN && newState != CDCState.PERMITTED) + throw new IllegalArgumentException("Only transition from FORBIDDEN to PERMITTED is allowed."); + + cdcState = newState; + } + } + /** * A simple class for tracking information about the portion of a segment that has been allocated to a log write. - * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be - * stack-allocated by escape analysis in CommitLog.add. */ - static class Allocation + protected static class Allocation { private final CommitLogSegment segment; private final OpOrder.Group appendOp; @@ -652,9 +666,9 @@ public abstract class CommitLogSegment segment.waitForSync(position, waitingOnCommit); } - public ReplayPosition getReplayPosition() + public CommitLogPosition getCommitLogPosition() { - return new ReplayPosition(segment.id, buffer.limit()); + return new CommitLogPosition(segment.id, buffer.limit()); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java deleted file mode 100644 index 4f1166b..0000000 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ /dev/null @@ -1,567 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.commitlog; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.WaitQueue; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.WrappedRunnable; - -import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; - -/** - * Performs eager-creation of commit log segments in a background thread. All the - * public methods are thread safe. - */ -public class CommitLogSegmentManager -{ - static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class); - - /** - * Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation. - */ - private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>(); - - /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */ - private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>(); - - /** Active segments, containing unflushed data */ - private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); - - /** The segment we are currently allocating commit log records to */ - private volatile CommitLogSegment allocatingFrom = null; - - private final WaitQueue hasAvailableSegments = new WaitQueue(); - - /** - * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size - * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic - * can see the effect of recycling segments immediately (even though they're really happening asynchronously - * on the manager thread, which will take a ms or two). - */ - private final AtomicLong size = new AtomicLong(); - - /** - * New segment creation is initially disabled because we'll typically get some "free" segments - * recycled after log replay. - */ - volatile boolean createReserveSegments = false; - - private Thread managerThread; - private volatile boolean run = true; - private final CommitLog commitLog; - - CommitLogSegmentManager(final CommitLog commitLog) - { - this.commitLog = commitLog; - } - - void start() - { - // The run loop for the manager thread - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws Exception - { - while (run) - { - try - { - Runnable task = segmentManagementTasks.poll(); - if (task == null) - { - // if we have no more work to do, check if we should create a new segment - if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) - { - logger.trace("No segments in reserve; creating a fresh one"); - // TODO : some error handling in case we fail to create a new segment - availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager())); - hasAvailableSegments.signalAll(); - } - - // flush old Cfs if we're full - long unused = unusedCapacity(); - if (unused < 0) - { - List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); - long spaceToReclaim = 0; - for (CommitLogSegment segment : activeSegments) - { - if (segment == allocatingFrom) - break; - segmentsToRecycle.add(segment); - spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); - if (spaceToReclaim + unused >= 0) - break; - } - flushDataFrom(segmentsToRecycle, false); - } - - try - { - // wait for new work to be provided - task = segmentManagementTasks.take(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - } - - task.run(); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) - return; - // sleep some arbitrary period to avoid spamming CL - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - } - } - - private boolean atSegmentLimit() - { - return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit(); - } - - }; - - run = true; - - managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); - managerThread.start(); - } - - /** - * Reserve space in the current segment for the provided mutation or, if there isn't space available, - * create a new segment. - * - * @return the provided Allocation object - */ - public Allocation allocate(Mutation mutation, int size) - { - CommitLogSegment segment = allocatingFrom(); - - Allocation alloc; - while ( null == (alloc = segment.allocate(mutation, size)) ) - { - // failed to allocate, so move to a new segment with enough room - advanceAllocatingFrom(segment); - segment = allocatingFrom; - } - - return alloc; - } - - // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call - CommitLogSegment allocatingFrom() - { - CommitLogSegment r = allocatingFrom; - if (r == null) - { - advanceAllocatingFrom(null); - r = allocatingFrom; - } - return r; - } - - /** - * Fetches a new segment from the queue, creating a new one if necessary, and activates it - */ - private void advanceAllocatingFrom(CommitLogSegment old) - { - while (true) - { - CommitLogSegment next; - synchronized (this) - { - // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments - // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432 - if (allocatingFrom != old) - return; - next = availableSegments.poll(); - if (next != null) - { - allocatingFrom = next; - activeSegments.add(next); - } - } - - if (next != null) - { - if (old != null) - { - // Now we can run the user defined command just after switching to the new commit log. - // (Do this here instead of in the recycle call so we can get a head start on the archive.) - commitLog.archiver.maybeArchive(old); - - // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it - old.discardUnusedTail(); - } - - // request that the CL be synced out-of-band, as we've finished a segment - commitLog.requestExtraSync(); - return; - } - - // no more segments, so register to receive a signal when not empty - WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time()); - - // trigger the management thread; this must occur after registering - // the signal to ensure we are woken by any new segment creation - wakeManager(); - - // check if the queue has already been added to before waiting on the signal, to catch modifications - // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change - if (!availableSegments.isEmpty() || allocatingFrom != old) - { - signal.cancel(); - // if we've been beaten, just stop immediately - if (allocatingFrom != old) - return; - // otherwise try again, as there should be an available segment - continue; - } - - // can only reach here if the queue hasn't been inserted into - // before we registered the signal, as we only remove items from the queue - // after updating allocatingFrom. Can safely block until we are signalled - // by the allocator that new segments have been published - signal.awaitUninterruptibly(); - } - } - - private void wakeManager() - { - // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary) - segmentManagementTasks.add(Runnables.doNothing()); - } - - /** - * Switch to a new segment, regardless of how much is left in the current one. - * - * Flushes any dirty CFs for this segment and any older segments, and then recycles - * the segments - */ - void forceRecycleAll(Iterable<UUID> droppedCfs) - { - List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); - CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); - advanceAllocatingFrom(last); - - // wait for the commit log modifications - last.waitForModifications(); - - // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes - // on the relevant keyspaces to complete - Keyspace.writeOrder.awaitNewBarrier(); - - // flush and wait for all CFs that are dirty in segments up-to and including 'last' - Future<?> future = flushDataFrom(segmentsToRecycle, true); - try - { - future.get(); - - for (CommitLogSegment segment : activeSegments) - for (UUID cfId : droppedCfs) - segment.markClean(cfId, segment.getContext()); - - // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() - // if the previous active segment was the only one to recycle (since an active segment isn't - // necessarily dirty, and we only call dCS after a flush). - for (CommitLogSegment segment : activeSegments) - if (segment.isUnused()) - recycleSegment(segment); - - CommitLogSegment first; - if ((first = activeSegments.peek()) != null && first.id <= last.id) - logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); - } - catch (Throwable t) - { - // for now just log the error and return false, indicating that we failed - logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); - } - } - - /** - * Indicates that a segment is no longer in use and that it should be recycled. - * - * @param segment segment that is no longer in use - */ - void recycleSegment(final CommitLogSegment segment) - { - boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); - if (activeSegments.remove(segment)) - { - // if archiving (command) was not successful then leave the file alone. don't delete or recycle. - discardSegment(segment, archiveSuccess); - } - else - { - logger.warn("segment {} not found in activeSegments queue", segment); - } - } - - /** - * Differs from the above because it can work on any file instead of just existing - * commit log segments managed by this manager. - * - * @param file segment file that is no longer in use. - */ - void recycleSegment(final File file) - { - // (don't decrease managed size, since this was never a "live" segment) - logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); - FileUtils.deleteWithConfirm(file); - } - - /** - * Indicates that a segment file should be deleted. - * - * @param segment segment to be discarded - */ - private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) - { - logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script"); - - segmentManagementTasks.add(new Runnable() - { - public void run() - { - segment.discard(deleteFile); - } - }); - } - - /** - * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. - * @param addedSize - */ - void addSize(long addedSize) - { - size.addAndGet(addedSize); - } - - /** - * @return the space (in bytes) used by all segment files. - */ - public long onDiskSize() - { - return size.get(); - } - - private long unusedCapacity() - { - long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; - long currentSize = size.get(); - logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); - return total - currentSize; - } - - /** - * @param name the filename to check - * @return true if file is managed by this manager. - */ - public boolean manages(String name) - { - for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments)) - if (segment.getName().equals(name)) - return true; - return false; - } - - /** - * Throws a flag that enables the behavior of keeping at least one spare segment - * available at all times. - */ - void enableReserveSegmentCreation() - { - createReserveSegments = true; - wakeManager(); - } - - /** - * Force a flush on all CFs that are still dirty in @param segments. - * - * @return a Future that will finish when all the flushes are complete. - */ - private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) - { - if (segments.isEmpty()) - return Futures.immediateFuture(null); - final ReplayPosition maxReplayPosition = segments.get(segments.size() - 1).getContext(); - - // a map of CfId -> forceFlush() to ensure we only queue one flush per cf - final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); - - for (CommitLogSegment segment : segments) - { - for (UUID dirtyCFId : segment.getDirtyCFIDs()) - { - Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); - if (pair == null) - { - // even though we remove the schema entry before a final flush when dropping a CF, - // it's still possible for a writer to race and finish his append after the flush. - logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); - segment.markClean(dirtyCFId, segment.getContext()); - } - else if (!flushes.containsKey(dirtyCFId)) - { - String keyspace = pair.left; - final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); - // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, - // no deadlock possibility since switchLock removal - flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxReplayPosition)); - } - } - } - - return Futures.allAsList(flushes.values()); - } - - /** - * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. - * Only call this after the AbstractCommitLogService is shut down. - */ - public void stopUnsafe(boolean deleteSegments) - { - logger.trace("CLSM closing and clearing existing commit log segments..."); - createReserveSegments = false; - - awaitManagementTasksCompletion(); - - shutdown(); - try - { - awaitTermination(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - - synchronized (this) - { - for (CommitLogSegment segment : activeSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - activeSegments.clear(); - - for (CommitLogSegment segment : availableSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - availableSegments.clear(); - } - - allocatingFrom = null; - - segmentManagementTasks.clear(); - - size.set(0L); - - logger.trace("CLSM done with closing and clearing existing commit log segments."); - } - - // Used by tests only. - void awaitManagementTasksCompletion() - { - while (!segmentManagementTasks.isEmpty()) - Thread.yield(); - // The last management task is not yet complete. Wait a while for it. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure - // waiting completes correctly. - } - - private static void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) - { - try - { - segment.discard(delete); - } - catch (AssertionError ignored) - { - // segment file does not exist - } - } - - /** - * Initiates the shutdown process for the management thread. - */ - public void shutdown() - { - run = false; - wakeManager(); - } - - /** - * Returns when the management thread terminates. - */ - public void awaitTermination() throws InterruptedException - { - managerThread.join(); - - for (CommitLogSegment segment : activeSegments) - segment.close(); - - for (CommitLogSegment segment : availableSegments) - segment.close(); - - FileDirectSegment.shutdown(); - } - - /** - * @return a read-only collection of the active commit log segments - */ - @VisibleForTesting - public Collection<CommitLogSegment> getActiveSegments() - { - return Collections.unmodifiableCollection(activeSegments); - } - -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java new file mode 100644 index 0000000..5c6fd3f --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.DirectorySizeCalculator; + +public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class); + private final CDCSizeTracker cdcSizeTracker; + + public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory) + { + super(commitLog, storageDirectory); + cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation())); + } + + @Override + void start() + { + super.start(); + cdcSizeTracker.start(); + } + + public void discard(CommitLogSegment segment, boolean delete) + { + segment.close(); + addSize(-segment.onDiskSize()); + + cdcSizeTracker.processDiscardedSegment(segment); + + if (segment.getCDCState() == CDCState.CONTAINS) + FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName()); + else + { + if (delete) + FileUtils.deleteWithConfirm(segment.logFile); + } + } + + /** + * Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor. + */ + public void shutdown() + { + run = false; + cdcSizeTracker.shutdown(); + wakeManager(); + } + + /** + * Reserve space in the current segment for the provided mutation or, if there isn't space available, + * create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations. + * + * @param mutation Mutation to allocate in segment manager + * @param size total size (overhead + serialized) of mutation + * @return the created Allocation object + * @throws WriteTimeoutException If segment disallows CDC mutations, we throw WTE + */ + @Override + public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException + { + CommitLogSegment segment = allocatingFrom(); + CommitLogSegment.Allocation alloc; + + throwIfForbidden(mutation, segment); + while ( null == (alloc = segment.allocate(mutation, size)) ) + { + // Failed to allocate, so move to a new segment with enough room if possible. + advanceAllocatingFrom(segment); + segment = allocatingFrom; + + throwIfForbidden(mutation, segment); + } + + if (mutation.trackedByCDC()) + segment.setCDCState(CDCState.CONTAINS); + + return alloc; + } + + private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException + { + if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN) + { + cdcSizeTracker.submitOverflowSizeRecalculation(); + throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + } + + /** + * Move files to cdc_raw after replay, since recovery will flush to SSTable and these mutations won't be available + * in the CL subsystem otherwise. + */ + void handleReplayedSegment(final File file) + { + logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", file); + FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName()); + cdcSizeTracker.addFlushedSize(file.length()); + } + + /** + * On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently + * allocated unflushed CDC segments and the contents of cdc_raw + */ + public CommitLogSegment createSegment() + { + CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager()); + cdcSizeTracker.processNewSegment(segment); + return segment; + } + + /** + * Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC + * data in them and all segments archived into cdc_raw. + * + * Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full + * directory walk to determine any potential deletions by CDC consumer. + * + * TODO: linux performs approximately 25% better with the following one-liner instead of this walker: + * Arrays.stream(path.listFiles()).mapToLong(File::length).sum(); + * However this solution is 375% slower on Windows. Revisit this and split logic to per-OS + */ + private class CDCSizeTracker extends DirectorySizeCalculator + { + private final RateLimiter rateLimiter = RateLimiter.create(1000 / DatabaseDescriptor.getCDCDiskCheckInterval()); + private ExecutorService cdcSizeCalculationExecutor; + private CommitLogSegmentManagerCDC segmentManager; + private AtomicLong unflushedCDCSize = new AtomicLong(0); + + CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path) + { + super(path); + this.segmentManager = segmentManager; + } + + /** + * Needed for stop/restart during unit tests + */ + public void start() + { + cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy()); + } + + /** + * Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new + * segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads. + * + * This can be reached either from the segment management thread in ABstractCommitLogSegmentManager or from the + * size recalculation executor, so we synchronize on this object to reduce the race overlap window available for + * size to get off. + * + * Reference DirectorySizerBench for more information about performance of the directory size recalc. + */ + void processNewSegment(CommitLogSegment segment) + { + // See synchronization in CommitLogSegment.setCDCState + synchronized(segment) + { + segment.setCDCState(defaultSegmentSize() + totalCDCSizeOnDisk() > allowableCDCBytes() + ? CDCState.FORBIDDEN + : CDCState.PERMITTED); + if (segment.getCDCState() == CDCState.PERMITTED) + unflushedCDCSize.addAndGet(defaultSegmentSize()); + } + + // Take this opportunity to kick off a recalc to pick up any consumer file deletion. + submitOverflowSizeRecalculation(); + } + + void processDiscardedSegment(CommitLogSegment segment) + { + // See synchronization in CommitLogSegment.setCDCState + synchronized(segment) + { + // Add to flushed size before decrementing unflushed so we don't have a window of false generosity + if (segment.getCDCState() == CDCState.CONTAINS) + size.addAndGet(segment.onDiskSize()); + if (segment.getCDCState() != CDCState.FORBIDDEN) + unflushedCDCSize.addAndGet(-defaultSegmentSize()); + } + + // Take this opportunity to kick off a recalc to pick up any consumer file deletion. + submitOverflowSizeRecalculation(); + } + + private long allowableCDCBytes() + { + return (long)DatabaseDescriptor.getCDCSpaceInMB() * 1024 * 1024; + } + + public void submitOverflowSizeRecalculation() + { + try + { + cdcSizeCalculationExecutor.submit(() -> recalculateOverflowSize()); + } + catch (RejectedExecutionException e) + { + // Do nothing. Means we have one in flight so this req. should be satisfied when it completes. + } + } + + private void recalculateOverflowSize() + { + rateLimiter.acquire(); + calculateSize(); + CommitLogSegment allocatingFrom = segmentManager.allocatingFrom; + if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN) + processNewSegment(allocatingFrom); + } + + private int defaultSegmentSize() + { + return DatabaseDescriptor.getCommitLogSegmentSize(); + } + + private void calculateSize() + { + try + { + // Since we don't synchronize around either rebuilding our file list or walking the tree and adding to + // size, it's possible we could have changes take place underneath us and end up with a slightly incorrect + // view of our flushed size by the time this walking completes. Given that there's a linear growth in + // runtime on both rebuildFileList and walkFileTree (about 50% for each one on runtime), and that the + // window for this race should be very small, this is an acceptable trade-off since it will be resolved + // on the next segment creation / deletion with a subsequent call to submitOverflowSizeRecalculation. + rebuildFileList(); + Files.walkFileTree(path.toPath(), this); + } + catch (IOException ie) + { + CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie); + } + } + + private long addFlushedSize(long toAdd) + { + return size.addAndGet(toAdd); + } + + private long totalCDCSizeOnDisk() + { + return unflushedCDCSize.get() + size.get(); + } + + public void shutdown() + { + cdcSizeCalculationExecutor.shutdown(); + } + } + + /** + * Only use for testing / validation that size tracker is working. Not for production use. + */ + @VisibleForTesting + public long updateCDCTotalSize() + { + cdcSizeTracker.submitOverflowSizeRecalculation(); + + // Give the update time to run + try + { + Thread.sleep(DatabaseDescriptor.getCDCDiskCheckInterval() + 10); + } + catch (InterruptedException e) {} + + return cdcSizeTracker.totalCDCSizeOnDisk(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java new file mode 100644 index 0000000..333077c --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.io.File; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.util.FileUtils; + +public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentManager +{ + public CommitLogSegmentManagerStandard(final CommitLog commitLog, String storageDirectory) + { + super(commitLog, storageDirectory); + } + + public void discard(CommitLogSegment segment, boolean delete) + { + segment.close(); + if (delete) + FileUtils.deleteWithConfirm(segment.logFile); + addSize(-segment.onDiskSize()); + } + + /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + run = false; + wakeManager(); + } + + /** + * Reserve space in the current segment for the provided mutation or, if there isn't space available, + * create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom + * + * @param mutation mutation to allocate space for + * @param size total size of mutation (overhead + serialized size) + * @return the provided Allocation object + */ + public CommitLogSegment.Allocation allocate(Mutation mutation, int size) + { + CommitLogSegment segment = allocatingFrom(); + + CommitLogSegment.Allocation alloc; + while ( null == (alloc = segment.allocate(mutation, size)) ) + { + // failed to allocate, so move to a new segment with enough room + advanceAllocatingFrom(segment); + segment = allocatingFrom; + } + + return alloc; + } + + /** + * Simply delete untracked segment files w/standard, as it'll be flushed to sstables during recovery + * + * @param file segment file that is no longer in use. + */ + void handleReplayedSegment(final File file) + { + // (don't decrease managed size, since this was never a "live" segment) + logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); + FileUtils.deleteWithConfirm(file); + } + + public CommitLogSegment createSegment() + { + return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java new file mode 100644 index 0000000..b547131 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.zip.CRC32; +import javax.crypto.Cipher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.*; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileSegmentInputStream; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +/** + * Read each sync section of a commit log, iteratively. + */ +public class CommitLogSegmentReader implements Iterable<CommitLogSegmentReader.SyncSegment> +{ + private final CommitLogReadHandler handler; + private final CommitLogDescriptor descriptor; + private final RandomAccessReader reader; + private final Segmenter segmenter; + private final boolean tolerateTruncation; + + /** + * ending position of the current sync section. + */ + protected int end; + + protected CommitLogSegmentReader(CommitLogReadHandler handler, + CommitLogDescriptor descriptor, + RandomAccessReader reader, + boolean tolerateTruncation) + { + this.handler = handler; + this.descriptor = descriptor; + this.reader = reader; + this.tolerateTruncation = tolerateTruncation; + + end = (int) reader.getFilePointer(); + if (descriptor.getEncryptionContext().isEnabled()) + segmenter = new EncryptedSegmenter(descriptor, reader); + else if (descriptor.compression != null) + segmenter = new CompressedSegmenter(descriptor, reader); + else + segmenter = new NoOpSegmenter(reader); + } + + public Iterator<SyncSegment> iterator() + { + return new SegmentIterator(); + } + + protected class SegmentIterator extends AbstractIterator<CommitLogSegmentReader.SyncSegment> + { + protected SyncSegment computeNext() + { + while (true) + { + try + { + final int currentStart = end; + end = readSyncMarker(descriptor, currentStart, reader); + if (end == -1) + { + return endOfData(); + } + if (end > reader.length()) + { + // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now. + // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt + end = (int) reader.length(); + } + return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end); + } + catch(CommitLogSegmentReader.SegmentReadException e) + { + try + { + handler.handleUnrecoverableError(new CommitLogReadException( + e.getMessage(), + CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR, + !e.invalidCrc && tolerateTruncation)); + } + catch (IOException ioe) + { + throw new RuntimeException(ioe); + } + } + catch (IOException e) + { + try + { + boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length()); + // if no exception is thrown, the while loop will continue + handler.handleUnrecoverableError(new CommitLogReadException( + e.getMessage(), + CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR, + tolerateErrorsInSection)); + } + catch (IOException ioe) + { + throw new RuntimeException(ioe); + } + } + } + } + } + + private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException + { + if (offset > reader.length() - SYNC_MARKER_SIZE) + { + // There was no room in the segment to write a final header. No data could be present here. + return -1; + } + reader.seek(offset); + CRC32 crc = new CRC32(); + updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); + updateChecksumInt(crc, (int) (descriptor.id >>> 32)); + updateChecksumInt(crc, (int) reader.getPosition()); + final int end = reader.readInt(); + long filecrc = reader.readInt() & 0xffffffffL; + if (crc.getValue() != filecrc) + { + if (end != 0 || filecrc != 0) + { + String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " + + "The end of segment marker should be zero.", offset, reader.getPath()); + throw new SegmentReadException(msg, true); + } + return -1; + } + else if (end < offset || end > reader.length()) + { + String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath()); + throw new SegmentReadException(msg, false); + } + return end; + } + + public static class SegmentReadException extends IOException + { + public final boolean invalidCrc; + + public SegmentReadException(String msg, boolean invalidCrc) + { + super(msg); + this.invalidCrc = invalidCrc; + } + } + + public static class SyncSegment + { + /** the 'buffer' to replay commit log data from */ + public final FileDataInput input; + + /** offset in file where this section begins. */ + public final int fileStartPosition; + + /** offset in file where this section ends. */ + public final int fileEndPosition; + + /** the logical ending position of the buffer */ + public final int endPosition; + + public final boolean toleratesErrorsInSection; + + public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection) + { + this.input = input; + this.fileStartPosition = fileStartPosition; + this.fileEndPosition = fileEndPosition; + this.endPosition = endPosition; + this.toleratesErrorsInSection = toleratesErrorsInSection; + } + } + + /** + * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers. + */ + interface Segmenter + { + /** + * Get the next section of the commit log to replay. + * + * @param startPosition the position in the file to begin reading at + * @param nextSectionStartPosition the file position of the beginning of the next section + * @return the buffer and it's logical end position + * @throws IOException + */ + SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException; + + /** + * Determine if we tolerate errors in the current segment. + */ + default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength) + { + return segmentEndPosition >= fileLength || segmentEndPosition < 0; + } + } + + static class NoOpSegmenter implements Segmenter + { + private final RandomAccessReader reader; + + public NoOpSegmenter(RandomAccessReader reader) + { + this.reader = reader; + } + + public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) + { + reader.seek(startPosition); + return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true); + } + + public boolean tolerateSegmentErrors(int end, long length) + { + return true; + } + } + + static class CompressedSegmenter implements Segmenter + { + private final ICompressor compressor; + private final RandomAccessReader reader; + private byte[] compressedBuffer; + private byte[] uncompressedBuffer; + private long nextLogicalStart; + + public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader) + { + this(CompressionParams.createCompressor(desc.compression), reader); + } + + public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader) + { + this.compressor = compressor; + this.reader = reader; + compressedBuffer = new byte[0]; + uncompressedBuffer = new byte[0]; + nextLogicalStart = reader.getFilePointer(); + } + + public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException + { + reader.seek(startPosition); + int uncompressedLength = reader.readInt(); + + int compressedLength = nextSectionStartPosition - (int)reader.getPosition(); + if (compressedLength > compressedBuffer.length) + compressedBuffer = new byte[(int) (1.2 * compressedLength)]; + reader.readFully(compressedBuffer, 0, compressedLength); + + if (uncompressedLength > uncompressedBuffer.length) + uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; + int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0); + nextLogicalStart += SYNC_MARKER_SIZE; + FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart); + nextLogicalStart += uncompressedLength; + return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length())); + } + } + + static class EncryptedSegmenter implements Segmenter + { + private final RandomAccessReader reader; + private final ICompressor compressor; + private final Cipher cipher; + + /** + * the result of the decryption is written into this buffer. + */ + private ByteBuffer decryptedBuffer; + + /** + * the result of the decryption is written into this buffer. + */ + private ByteBuffer uncompressedBuffer; + + private final ChunkProvider chunkProvider; + + private long currentSegmentEndPosition; + private long nextLogicalStart; + + public EncryptedSegmenter(CommitLogDescriptor descriptor, RandomAccessReader reader) + { + this(reader, descriptor.getEncryptionContext()); + } + + @VisibleForTesting + EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext) + { + this.reader = reader; + decryptedBuffer = ByteBuffer.allocate(0); + compressor = encryptionContext.getCompressor(); + nextLogicalStart = reader.getFilePointer(); + + try + { + cipher = encryptionContext.getDecryptor(); + } + catch (IOException ioe) + { + throw new FSReadError(ioe, reader.getPath()); + } + + chunkProvider = () -> { + if (reader.getFilePointer() >= currentSegmentEndPosition) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + try + { + decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher); + uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor); + return uncompressedBuffer; + } + catch (IOException e) + { + throw new FSReadError(e, reader.getPath()); + } + }; + } + + public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException + { + int totalPlainTextLength = reader.readInt(); + currentSegmentEndPosition = nextSectionStartPosition - 1; + + nextLogicalStart += SYNC_MARKER_SIZE; + FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider); + nextLogicalStart += totalPlainTextLength; + return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length())); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 684fc2c..e44dfdf 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment /** * Constructs a new segment file. */ - CompressedSegment(CommitLog commitLog, Runnable onClose) + CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) { - super(commitLog, onClose); + super(commitLog, manager, onClose); this.compressor = commitLog.configuration.getCompressor(); } @@ -57,7 +57,7 @@ public class CompressedSegment extends FileDirectSegment ByteBuffer createBuffer(CommitLog commitLog) { - return createBuffer(commitLog.configuration.getCompressor().preferredBufferType()); + return manager.getBufferPool().createBuffer(commitLog.configuration.getCompressor().preferredBufferType()); } @Override @@ -71,13 +71,13 @@ public class CompressedSegment extends FileDirectSegment try { int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE; - ByteBuffer compressedBuffer = reusableBufferHolder.get(); + ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer(); if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) || compressedBuffer.capacity() < neededBufferSize) { FileUtils.clean(compressedBuffer); compressedBuffer = allocate(neededBufferSize); - reusableBufferHolder.set(compressedBuffer); + manager.getBufferPool().setThreadLocalReusableBuffer(compressedBuffer); } ByteBuffer inputBuffer = buffer.duplicate(); @@ -91,7 +91,7 @@ public class CompressedSegment extends FileDirectSegment // Only one thread can be here at a given time. // Protected by synchronization on CommitLogSegment.sync(). writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); - commitLog.allocator.addSize(compressedBuffer.limit()); + manager.addSize(compressedBuffer.limit()); channel.write(compressedBuffer); assert channel.position() - lastWrittenPos == compressedBuffer.limit(); lastWrittenPos = channel.position(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index c34a365..e13b20a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment private final EncryptionContext encryptionContext; private final Cipher cipher; - public EncryptedSegment(CommitLog commitLog, Runnable onClose) + public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) { - super(commitLog, onClose); + super(commitLog, manager, onClose); this.encryptionContext = commitLog.configuration.getEncryptionContext(); try @@ -90,9 +90,9 @@ public class EncryptedSegment extends FileDirectSegment ByteBuffer createBuffer(CommitLog commitLog) { - //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, + // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs - return createBuffer(BufferType.ON_HEAP); + return manager.getBufferPool().createBuffer(BufferType.ON_HEAP); } void write(int startMarker, int nextMarker) @@ -108,7 +108,7 @@ public class EncryptedSegment extends FileDirectSegment { ByteBuffer inputBuffer = buffer.duplicate(); inputBuffer.limit(contentStart + length).position(contentStart); - ByteBuffer buffer = reusableBufferHolder.get(); + ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(); // save space for the sync marker at the beginning of this section final long syncMarkerPosition = lastWrittenPos; @@ -127,7 +127,7 @@ public class EncryptedSegment extends FileDirectSegment buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher); contentStart += nextBlockSize; - commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); + manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); } lastWrittenPos = channel.position(); @@ -138,15 +138,15 @@ public class EncryptedSegment extends FileDirectSegment writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); buffer.putInt(SYNC_MARKER_SIZE, length); buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); - commitLog.allocator.addSize(buffer.limit()); + manager.addSize(buffer.limit()); channel.position(syncMarkerPosition); channel.write(buffer); SyncUtil.force(channel, true); - if (reusableBufferHolder.get().capacity() < buffer.capacity()) - reusableBufferHolder.set(buffer); + if (manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < buffer.capacity()) + manager.getBufferPool().setThreadLocalReusableBuffer(buffer); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java index 50f9efd..d4160e4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java @@ -19,15 +19,8 @@ package org.apache.cassandra.db.commitlog; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import io.netty.util.concurrent.FastThreadLocal; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.BufferType; -import org.apache.cassandra.io.util.FileUtils; /** * Writes to the backing commit log file only on sync, allowing transformations of the mutations, @@ -35,45 +28,23 @@ import org.apache.cassandra.io.util.FileUtils; */ public abstract class FileDirectSegment extends CommitLogSegment { - protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>() - { - protected ByteBuffer initialValue() - { - return ByteBuffer.allocate(0); - } - }; - - static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>(); - - /** - * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that - * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use - * more, depending on how soon the sync policy stops all writing threads. - */ - static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(); - - /** - * The number of buffers in use - */ - private static AtomicInteger usedBuffers = new AtomicInteger(0); - volatile long lastWrittenPos = 0; - private final Runnable onClose; - FileDirectSegment(CommitLog commitLog, Runnable onClose) + FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) { - super(commitLog); + super(commitLog, manager); this.onClose = onClose; } + @Override void writeLogHeader() { super.writeLogHeader(); try { channel.write((ByteBuffer) buffer.duplicate().flip()); - commitLog.allocator.addSize(lastWrittenPos = buffer.position()); + manager.addSize(lastWrittenPos = buffer.position()); } catch (IOException e) { @@ -81,30 +52,12 @@ public abstract class FileDirectSegment extends CommitLogSegment } } - ByteBuffer createBuffer(BufferType bufferType) - { - usedBuffers.incrementAndGet(); - ByteBuffer buf = bufferPool.poll(); - if (buf != null) - { - buf.clear(); - return buf; - } - - return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize()); - } - @Override protected void internalClose() { - usedBuffers.decrementAndGet(); - try { - if (bufferPool.size() < MAX_BUFFERPOOL_SIZE) - bufferPool.add(buffer); - else - FileUtils.clean(buffer); + manager.getBufferPool().releaseBuffer(buffer); super.internalClose(); } finally @@ -112,20 +65,4 @@ public abstract class FileDirectSegment extends CommitLogSegment onClose.run(); } } - - static void shutdown() - { - bufferPool.clear(); - } - - /** - * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool. - * - * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers - * allowed in the pool, <code>false</code> otherwise. - */ - static boolean hasReachedPoolLimit() - { - return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java index 3fdf886..2bbd12d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java @@ -41,9 +41,9 @@ public class MemoryMappedSegment extends CommitLogSegment * * @param commitLog the commit log it will be used with. */ - MemoryMappedSegment(CommitLog commitLog) + MemoryMappedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { - super(commitLog); + super(commitLog, manager); // mark the initial sync marker as uninitialised int firstSync = buffer.position(); buffer.putInt(firstSync + 0, 0); @@ -66,7 +66,7 @@ public class MemoryMappedSegment extends CommitLogSegment { throw new FSWriteError(e, logFile); } - commitLog.allocator.addSize(DatabaseDescriptor.getCommitLogSegmentSize()); + manager.addSize(DatabaseDescriptor.getCommitLogSegmentSize()); return channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java deleted file mode 100644 index 0b21763..0000000 --- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.commitlog; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -import com.google.common.collect.Ordering; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; - -public class ReplayPosition implements Comparable<ReplayPosition> -{ - public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer(); - - // NONE is used for SSTables that are streamed from other nodes and thus have no relationship - // with our local commitlog. The values satisfy the criteria that - // - no real commitlog segment will have the given id - // - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition - public static final ReplayPosition NONE = new ReplayPosition(-1, 0); - - public final long segment; - public final int position; - - /** - * A filter of known safe-to-discard commit log replay positions, based on - * the range covered by on disk sstables and those prior to the most recent truncation record - */ - public static class ReplayFilter - { - final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>(); - public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt) - { - for (SSTableReader reader : onDisk) - { - ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound; - ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound; - add(persisted, start, end); - } - if (truncatedAt != null) - add(persisted, ReplayPosition.NONE, truncatedAt); - } - - private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end) - { - // extend ourselves to cover any ranges we overlap - // record directly preceding our end may extend past us, so take the max of our end and its - Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end); - if (extend != null && extend.getValue().compareTo(end) > 0) - end = extend.getValue(); - - // record directly preceding our start may extend into us; if it does, we take it as our start - extend = ranges.lowerEntry(start); - if (extend != null && extend.getValue().compareTo(start) >= 0) - start = extend.getKey(); - - ranges.subMap(start, end).clear(); - ranges.put(start, end); - } - - public boolean shouldReplay(ReplayPosition position) - { - // replay ranges are start exclusive, end inclusive - Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position); - return range == null || position.compareTo(range.getValue()) > 0; - } - - public boolean isEmpty() - { - return persisted.isEmpty(); - } - } - - public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges) - { - ReplayPosition min = null; - for (ReplayFilter map : ranges) - { - ReplayPosition first = map.persisted.firstEntry().getValue(); - if (min == null) - min = first; - else - min = Ordering.natural().min(min, first); - } - if (min == null) - return NONE; - return min; - } - - public ReplayPosition(long segment, int position) - { - this.segment = segment; - assert position >= 0; - this.position = position; - } - - public int compareTo(ReplayPosition that) - { - if (this.segment != that.segment) - return Long.compare(this.segment, that.segment); - - return Integer.compare(this.position, that.position); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ReplayPosition that = (ReplayPosition) o; - - if (position != that.position) return false; - return segment == that.segment; - } - - @Override - public int hashCode() - { - int result = (int) (segment ^ (segment >>> 32)); - result = 31 * result + position; - return result; - } - - @Override - public String toString() - { - return "ReplayPosition(" + - "segmentId=" + segment + - ", position=" + position + - ')'; - } - - public ReplayPosition clone() - { - return new ReplayPosition(segment, position); - } - - public static class ReplayPositionSerializer implements ISerializer<ReplayPosition> - { - public void serialize(ReplayPosition rp, DataOutputPlus out) throws IOException - { - out.writeLong(rp.segment); - out.writeInt(rp.position); - } - - public ReplayPosition deserialize(DataInputPlus in) throws IOException - { - return new ReplayPosition(in.readLong(), in.readInt()); - } - - public long serializedSize(ReplayPosition rp) - { - return TypeSizes.sizeof(rp.segment) + TypeSizes.sizeof(rp.position); - } - } -}