Author: jbellis
Date: Fri Dec 2 23:30:32 2011
New Revision: 1209779
URL: http://svn.apache.org/viewvc?rev=1209779&view=rev
Log:
fix commitlog segment recycling
patch by Rick Branson; reviewed by jbellis for CASSANDRA-3557
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Dec 2 23:30:32 2011
@@ -1,6 +1,7 @@
1.1-dev
* "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
- * Recycle commitlog segments for improved performance (CASSANDRA-3411, 3543)
+ * Recycle commitlog segments for improved performance
+ (CASSANDRA-3411, 3543, 3557)
* update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
* add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
* off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Fri Dec 2 23:30:32 2011
@@ -65,7 +65,7 @@ public class CommitLog implements Commit
private final ICommitLogExecutorService executor;
- private final CommitLogAllocator allocator;
+ public final CommitLogAllocator allocator;
public static final int END_OF_SEGMENT_MARKER = 0; // this is
written out at the end of a segment
public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of
bytes of ^^^
@@ -388,7 +388,7 @@ public class CommitLog implements Commit
*/
public int activeSegments()
{
- return allocator.activeSegments.size();
+ return allocator.getActiveSegments().size();
}
/**
@@ -427,7 +427,7 @@ public class CommitLog implements Commit
// flushed CF as clean, until we reach the segment file
containing the ReplayPosition passed
// in the arguments. Any segments that become unused after
they are marked clean will be
// recycled or discarded.
- for (Iterator<CommitLogSegment> iter =
allocator.activeSegments.iterator(); iter.hasNext(); )
+ for (Iterator<CommitLogSegment> iter =
allocator.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
segment.markClean(cfId, context);
@@ -438,7 +438,6 @@ public class CommitLog implements Commit
if (segment.isUnused() && iter.hasNext())
{
logger.debug("Commit log segment {} is unused",
segment);
- iter.remove();
allocator.recycleSegment(segment);
}
else
@@ -477,12 +476,9 @@ public class CommitLog implements Commit
*/
public void sync() throws IOException
{
- for (CommitLogSegment segment : allocator.activeSegments)
+ for (CommitLogSegment segment : allocator.getActiveSegments())
{
- if (segment.needsSync())
- {
- segment.sync();
- }
+ segment.sync();
}
}
@@ -515,12 +511,15 @@ public class CommitLog implements Commit
*/
public void forceNewSegment() throws ExecutionException,
InterruptedException
{
+ logger.debug("Forcing new segment creation");
+
Callable<?> task = new Callable()
{
public Object call() throws IOException
{
if (activeSegment.position() > 0)
activateNextSegment();
+
return null;
}
};
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
Fri Dec 2 23:30:32 2011
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOError;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -58,7 +60,7 @@ public class CommitLogAllocator
private final BlockingQueue<Runnable> queue = new
LinkedBlockingQueue<Runnable>();
/** Active segments, containing unflushed data */
- final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new
ConcurrentLinkedQueue<CommitLogSegment>();
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new
ConcurrentLinkedQueue<CommitLogSegment>();
/**
* Tracks commitlog size, in multiples of the segment size. We need to do
this so we can "promise" size
@@ -113,7 +115,7 @@ public class CommitLogAllocator
/**
* Fetches an empty segment file.
*
- * @return the next writeable segment
+ * @return the next writable segment
*/
public CommitLogSegment fetchSegment()
{
@@ -142,6 +144,8 @@ public class CommitLogAllocator
*/
public void recycleSegment(final CommitLogSegment segment)
{
+ activeSegments.remove(segment);
+
if (isCapExceeded())
{
discardSegment(segment);
@@ -152,7 +156,8 @@ public class CommitLogAllocator
{
public void run()
{
- segment.recycle();
+ CommitLogSegment recycled = segment.recycle();
+ internalAddReadySegment(recycled);
}
});
}
@@ -197,11 +202,11 @@ public class CommitLogAllocator
private void discardSegment(final CommitLogSegment segment)
{
size.addAndGet(-CommitLog.SEGMENT_SIZE);
+
queue.add(new Runnable()
{
public void run()
{
- activeSegments.remove(segment);
segment.discard();
}
});
@@ -253,11 +258,20 @@ public class CommitLogAllocator
return segment;
}
- public boolean isCapExceeded()
+ /**
+ * Check to see if the speculative current size exceeds the cap.
+ *
+ * @return true if cap is exceeded
+ */
+ private boolean isCapExceeded()
{
return size.get() > DatabaseDescriptor.getTotalCommitlogSpaceInMB() *
1024 * 1024;
}
+ /**
+ * Throws a flag that enables the behavior of keeping at least one spare
segment
+ * available at all times.
+ */
public void enableReserveSegmentCreation()
{
createReserveSegments = true;
@@ -323,5 +337,13 @@ public class CommitLogAllocator
{
allocationThread.join();
}
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Fri Dec 2 23:30:32 2011
@@ -119,6 +119,8 @@ public class CommitLogSegment
buffer =
logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, (long) 0,
(long) CommitLog.SEGMENT_SIZE);
buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
buffer.position(0);
+
+ needsSync = true;
}
catch (IOException e)
{
@@ -178,13 +180,26 @@ public class CommitLogSegment
*
* @return a new CommitLogSegment representing the newly reusable segment.
*/
- public void recycle()
+ public CommitLogSegment recycle()
{
// writes an end-of-segment marker at the very beginning of the file
and closes it
buffer.position(0);
buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
buffer.position(0);
- needsSync = true;
+
+ try
+ {
+ sync();
+ }
+ catch (IOException e)
+ {
+ // This is a best effort thing anyway
+ logger.warn("I/O error flushing " + this + " " + e);
+ }
+
+ close();
+
+ return new CommitLogSegment(getPath());
}
/**
@@ -253,8 +268,11 @@ public class CommitLogSegment
*/
public void sync() throws IOException
{
- buffer.force();
- needsSync = false;
+ if (needsSync)
+ {
+ buffer.force();
+ needsSync = false;
+ }
}
/**
@@ -346,14 +364,6 @@ public class CommitLogSegment
}
/**
- * @return true if this segment file has unflushed writes
- */
- public boolean needsSync()
- {
- return needsSync;
- }
-
- /**
* Check to see if a certain ReplayPosition is contained by this segment
file.
*
* @param context the replay position to be checked
@@ -384,13 +394,6 @@ public class CommitLogSegment
public int position()
{
- try
- {
- return (int) logFileAccessor.getFilePointer();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ return buffer.position();
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Fri Dec 2
23:30:32 2011
@@ -18,9 +18,11 @@
package org.apache.cassandra;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.utils.ByteBufferUtil;
import com.google.common.base.Charsets;
@@ -44,6 +46,8 @@ public class SchemaLoader
@BeforeClass
public static void loadSchema()
{
+ CommitLog.instance.allocator.enableReserveSegmentCreation();
+
Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler()
{
public void uncaughtException(Thread t, Throwable e)