Author: jbellis
Date: Fri Aug 5 20:19:45 2011
New Revision: 1154369
URL: http://svn.apache.org/viewvc?rev=1154369&view=rev
Log:
refactorings and corner-case bug fixes:
- avoid modifying the List of rows after passing it to a LazilyCompactedRow
- account for possibility that all data compacted by LCR has expired
- clean up code duplication around shouldPurge cleanup
patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/conf/cassandra.yaml
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Aug 5 20:19:45 2011
@@ -1,6 +1,7 @@
0.8.4
* include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
+
0.8.3
* add ability to drop local reads/writes that are going to timeout
(CASSANDRA-2943)
Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Fri Aug 5 20:19:45
2011
@@ -272,13 +272,13 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
-# Number of compaction threads (NOT including validation "compactions"
-# for anti-entropy repair). This default to the number of processors,
-# enabling multiple compactions to execute at once. Using more than one
-# thread is highly recommended to preserve read performance in a mixed
-# read/write workload as this avoids sstables from accumulating during long
-# running compactions. The default is usually fine and if you experience
-# problems with compaction running too slowly or too fast, you should look at
+# Number of simultaneous compactions to allow, NOT including
+# validation "compactions" for anti-entropy repair. This defaults to
+# the number of cores. This can help preserve read performance in a
+# mixed read/write workload, by mitigating the tendency of small
+# sstables to accumulate during a single long running compactions. The
+# default is usually fine and if you experience problems with
+# compaction running too slowly or too fast, you should look at
# compaction_throughput_mb_per_sec first.
#
# Uncomment to make compaction mono-threaded.
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Fri Aug 5 20:19:45 2011
@@ -46,6 +46,34 @@ import org.slf4j.LoggerFactory;
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
protected static Logger logger =
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
+ public static final RejectedExecutionHandler blockingExecutionHandler =
new RejectedExecutionHandler()
+ {
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor
executor)
+ {
+ ((DebuggableThreadPoolExecutor) executor).onInitialRejection(task);
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
+ {
+ if (executor.isShutdown())
+ {
+ ((DebuggableThreadPoolExecutor)
executor).onFinalRejection(task);
+ throw new RejectedExecutionException("ThreadPoolExecutor
has shut down");
+ }
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ {
+ ((DebuggableThreadPoolExecutor)
executor).onFinalAccept(task);
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+ };
public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
{
@@ -67,34 +95,7 @@ public class DebuggableThreadPoolExecuto
// we'll just override this with a handler that retries until it gets
in. ugly, but effective.
// (there is an extensive analysis of the options here at
//
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
- this.setRejectedExecutionHandler(new RejectedExecutionHandler()
- {
- public void rejectedExecution(Runnable task, ThreadPoolExecutor
executor)
- {
-
((DebuggableThreadPoolExecutor)executor).onInitialRejection(task);
- BlockingQueue<Runnable> queue = executor.getQueue();
- while (true)
- {
- if (executor.isShutdown())
- {
-
((DebuggableThreadPoolExecutor)executor).onFinalRejection(task);
- throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
- }
- try
- {
- if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
- {
-
((DebuggableThreadPoolExecutor)executor).onFinalAccept(task);
- break;
- }
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- }
- });
+ this.setRejectedExecutionHandler(blockingExecutionHandler);
}
protected void onInitialRejection(Runnable task) {}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Fri Aug 5 20:19:45 2011
@@ -120,7 +120,7 @@ implements Closeable, CompactionInfo.Hol
try
{
- AbstractCompactedRow compactedRow =
controller.getCompactedRow(rows);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(new
ArrayList<SSTableIdentityIterator>(rows));
if (compactedRow.isEmpty())
{
controller.invalidateCachedRow(compactedRow.key);
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
Fri Aug 5 20:19:45 2011
@@ -25,7 +25,6 @@ import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.security.MessageDigest;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -33,7 +32,11 @@ import com.google.common.base.Predicates
import com.google.common.collect.Iterators;
import org.apache.commons.collections.iterators.CollatingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -53,23 +56,25 @@ import org.apache.cassandra.utils.Reduci
*/
public class LazilyCompactedRow extends AbstractCompactedRow implements
IIterableColumns
{
+ private static Logger logger =
LoggerFactory.getLogger(LazilyCompactedRow.class);
+
private final List<SSTableIdentityIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
private final DataOutputBuffer headerBuffer;
private ColumnFamily emptyColumnFamily;
- private LazyColumnIterator iter;
+ private LazyColumnIterator reducer;
private int columnCount;
private long columnSerializedSize;
public LazilyCompactedRow(CompactionController controller,
List<SSTableIdentityIterator> rows)
{
super(rows.get(0).getKey());
+ this.rows = rows;
this.controller = controller;
this.shouldPurge = controller.shouldPurge(key);
- this.rows = new ArrayList<SSTableIdentityIterator>(rows);
- for (SSTableIdentityIterator row : rows)
+ for (IColumnIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
@@ -83,9 +88,10 @@ public class LazilyCompactedRow extends
headerBuffer = new DataOutputBuffer();
ColumnIndexer.serialize(this, headerBuffer);
// reach into iterator used by ColumnIndexer to get column count and
size
- columnCount = iter.size;
- columnSerializedSize = iter.serializedSize;
- iter = null;
+ // (however, if there are zero columns, iterator() will not be called
by ColumnIndexer and reducer will be null)
+ columnCount = reducer == null ? 0 : reducer.size;
+ columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
+ reducer = null;
}
public void write(DataOutput out) throws IOException
@@ -94,6 +100,9 @@ public class LazilyCompactedRow extends
ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
long dataSize = headerBuffer.getLength() + clockOut.getLength() +
columnSerializedSize;
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("header / clock / column sizes are %s /
%s / %s",
+ headerBuffer.getLength(), clockOut.getLength(),
columnSerializedSize));
assert dataSize > 0;
out.writeLong(dataSize);
out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
@@ -106,6 +115,9 @@ public class LazilyCompactedRow extends
IColumn column = iter.next();
emptyColumnFamily.getColumnSerializer().serialize(column, out);
}
+ long secondPassColumnSize = reducer == null ? 0 :
reducer.serializedSize;
+ assert secondPassColumnSize == columnSerializedSize
+ : "originally calculated column size of " +
columnSerializedSize + " but now it is " + secondPassColumnSize;
}
public void update(MessageDigest digest)
@@ -157,8 +169,8 @@ public class LazilyCompactedRow extends
{
row.reset();
}
- iter = new LazyColumnIterator(new
CollatingIterator(getComparator().columnComparator, rows));
- return Iterators.filter(iter, Predicates.notNull());
+ reducer = new LazyColumnIterator(new
CollatingIterator(getComparator().columnComparator, rows));
+ return Iterators.filter(reducer, Predicates.notNull());
}
public int columnCount()
@@ -190,18 +202,13 @@ public class LazilyCompactedRow extends
protected IColumn getReduced()
{
- assert container != null;
- IColumn reduced = container.iterator().next();
- ColumnFamily purged = shouldPurge ?
ColumnFamilyStore.removeDeleted(container, controller.gcBefore) : container;
- if (shouldPurge && purged != null &&
purged.metadata().getDefaultValidator().isCommutative())
- {
- CounterColumn.removeOldShards(purged, controller.gcBefore);
- }
+ ColumnFamily purged =
PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
return null;
}
+ IColumn reduced = purged.iterator().next();
container.clear();
serializedSize += reduced.serializedSize();
size++;
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
Fri Aug 5 20:19:45 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.CounterCo
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import sun.tools.tree.ThisExpression;
/**
* PrecompactedRow merges its rows in its constructor in memory.
@@ -55,11 +56,28 @@ public class PrecompactedRow extends Abs
this.gcBefore = Integer.MAX_VALUE;
}
+ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key,
CompactionController controller, ColumnFamily cf)
+ {
+ return removeDeletedAndOldShards(controller.shouldPurge(key),
controller, cf);
+ }
+
+ public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge,
CompactionController controller, ColumnFamily cf)
+ {
+ ColumnFamily compacted = shouldPurge ?
ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
+ if (shouldPurge && compacted != null &&
compacted.metadata().getDefaultValidator().isCommutative())
+ CounterColumn.removeOldShards(compacted, controller.gcBefore);
+ return compacted;
+ }
+
public PrecompactedRow(CompactionController controller,
List<SSTableIdentityIterator> rows)
{
super(rows.get(0).getKey());
- this.gcBefore = controller.gcBefore;
+ gcBefore = controller.gcBefore;
+ compactedCf = removeDeletedAndOldShards(rows.get(0).getKey(),
controller, merge(rows));
+ }
+ private static ColumnFamily merge(List<SSTableIdentityIterator> rows)
+ {
ColumnFamily cf = null;
for (SSTableIdentityIterator row : rows)
{
@@ -70,7 +88,7 @@ public class PrecompactedRow extends Abs
}
catch (IOException e)
{
- logger.error("Skipping row " + key + " in " + row.getPath(),
e);
+ logger.error("Skipping row " + row.getKey() + " in " +
row.getPath(), e);
continue;
}
if (cf == null)
@@ -82,45 +100,36 @@ public class PrecompactedRow extends Abs
cf.addAll(thisCF);
}
}
- boolean shouldPurge = controller.shouldPurge(key);
- compactedCf = shouldPurge ? ColumnFamilyStore.removeDeleted(cf,
controller.gcBefore) : cf;
- if (shouldPurge && compactedCf != null &&
compactedCf.metadata().getDefaultValidator().isCommutative())
- {
- CounterColumn.removeOldShards(compactedCf, controller.gcBefore);
- }
+ return cf;
}
public void write(DataOutput out) throws IOException
{
- if (compactedCf != null)
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- DataOutputBuffer headerBuffer = new DataOutputBuffer();
- ColumnIndexer.serialize(compactedCf, headerBuffer);
- ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
- out.writeLong(headerBuffer.getLength() + buffer.getLength());
- out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
- out.write(buffer.getData(), 0, buffer.getLength());
- }
+ assert compactedCf != null;
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ DataOutputBuffer headerBuffer = new DataOutputBuffer();
+ ColumnIndexer.serialize(compactedCf, headerBuffer);
+ ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
+ out.writeLong(headerBuffer.getLength() + buffer.getLength());
+ out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+ out.write(buffer.getData(), 0, buffer.getLength());
}
public void update(MessageDigest digest)
{
- if (compactedCf != null)
+ assert compactedCf != null;
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
{
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
- buffer.writeInt(compactedCf.getColumnCount());
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- compactedCf.updateDigest(digest);
+ ColumnFamily.serializer().serializeCFInfo(compactedCf, buffer);
+ buffer.writeInt(compactedCf.getColumnCount());
+ digest.update(buffer.getData(), 0, buffer.getLength());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
+ compactedCf.updateDigest(digest);
}
public boolean isEmpty()
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1154369&r1=1154368&r2=1154369&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Fri Aug 5 20:19:45 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.CleanupHelpe
import org.apache.cassandra.Util;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.dht.IPartitioner;
@@ -150,7 +151,8 @@ public abstract class AntiEntropyService
validator.prepare(store);
// add a row
- validator.add(new PrecompactedRow(new DecoratedKey(mid,
ByteBufferUtil.bytes("inconceivable!")), null));
+ validator.add(new PrecompactedRow(new DecoratedKey(mid,
ByteBufferUtil.bytes("inconceivable!")),
+ new
ColumnFamily(DatabaseDescriptor.getCFMetaData(tablename, cfname))));
validator.completeTree();
// confirm that the tree was validated