Author: jbellis
Date: Fri Apr 2 14:05:32 2010
New Revision: 930272
URL: http://svn.apache.org/viewvc?rev=930272&view=rev
Log:
add ReadRepairChance to CF definition. patch by Matthew Dennis; reviewed by
jbellis for CASSANDRA-930
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/storage-conf.xml
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Apr 2 14:05:32 2010
@@ -4,6 +4,7 @@ dev
* switched to slf4j logging (CASSANDRA-625)
* drain method to flush memtables and purge commit log prior to shutdown.
(CASSANDRA-880)
* access levels for authentication/authorization (CASSANDRA-900)
+ * add ReadRepairChance to CF definition (CASSANDRA-930)
0.6.1
Modified: cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/storage-conf.xml?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/conf/storage-conf.xml (original)
+++ cassandra/trunk/conf/storage-conf.xml Fri Apr 2 14:05:32 2010
@@ -82,6 +82,10 @@
~
~ An optional `Comment` attribute may be used to attach additional
~ human-readable information about the column family to its definition.
+ ~
+ ~ The optional ReadRepairChance attribute (from 0 to 1) specifies
+ ~ the probability with which read repairs should be invoked on
non-quorum
+ ~ reads. The default is 1.0 (always read repair).
~
~ The optional KeysCached attribute specifies
~ the number of keys per sstable whose locations we keep in
@@ -103,6 +107,7 @@
<ColumnFamily Name="Standard1" CompareWith="BytesType"/>
<ColumnFamily Name="Standard2"
CompareWith="UTF8Type"
+ ReadRepairChance="0.1"
KeysCached="100%"/>
<ColumnFamily Name="StandardByUUID1" CompareWith="TimeUUIDType" />
<ColumnFamily Name="Super1"
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri
Apr 2 14:05:32 2010
@@ -40,6 +40,7 @@ import org.apache.commons.lang.ObjectUti
public final class CFMetaData
{
+ public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
@@ -74,9 +75,11 @@ public final class CFMetaData
public final String comment; // for humans only
public final double rowCacheSize; // default 0
public final double keyCacheSize; // default 0.01
+ public final double readRepairChance; //chance 0 to 1, of doing a read
repair; defaults 1.0 (always)
public final int cfId;
- private CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize, int cfId)
+
+ private CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize, double readRepairChance, int cfId)
{
this.tableName = tableName;
this.cfName = cfName;
@@ -86,6 +89,7 @@ public final class CFMetaData
this.comment = comment;
this.rowCacheSize = rowCacheSize;
this.keyCacheSize = keyCacheSize;
+ this.readRepairChance = readRepairChance;
this.cfId = cfId;
currentCfNames.put(cfId, cfName);
cfIdMap.put(new Pair<String, String>(tableName, cfName), cfId);
@@ -93,21 +97,26 @@ public final class CFMetaData
public CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize)
{
- this(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, nextId());
+ this(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
}
-
+
+ public CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize, double readRepairChance)
+ {
+ this(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, readRepairChance, nextId());
+ }
+
/** clones an existing CFMetaData using the same id. */
public static CFMetaData rename(CFMetaData cfm, String newName)
{
purge(cfm);
- return new CFMetaData(cfm.tableName, newName, cfm.columnType,
cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize,
cfm.keyCacheSize, cfm.cfId);
+ return new CFMetaData(cfm.tableName, newName, cfm.columnType,
cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize,
cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
}
/** clones existing CFMetaData. keeps the id but changes the table name.*/
public static CFMetaData renameTable(CFMetaData cfm, String tableName)
{
purge(cfm);
- return new CFMetaData(tableName, cfm.cfName, cfm.columnType,
cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize,
cfm.keyCacheSize, cfm.cfId);
+ return new CFMetaData(tableName, cfm.cfName, cfm.columnType,
cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize,
cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
}
/** used for evicting cf data out of static tracking collections. */
@@ -141,6 +150,7 @@ public final class CFMetaData
dout.writeUTF(cfm.comment);
dout.writeDouble(cfm.rowCacheSize);
dout.writeDouble(cfm.keyCacheSize);
+ dout.writeDouble(cfm.readRepairChance);
dout.writeInt(cfm.cfId);
dout.close();
return bout.toByteArray();
@@ -173,10 +183,11 @@ public final class CFMetaData
String comment = din.readBoolean() ? din.readUTF() : null;
double rowCacheSize = din.readDouble();
double keyCacheSize = din.readDouble();
+ double readRepairChance = din.readDouble();
int cfId = din.readInt();
- return new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize, cfId);
+ return new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize, readRepairChance,
cfId);
}
-
+
public boolean equals(Object obj)
{
if (!(obj instanceof CFMetaData))
@@ -190,6 +201,7 @@ public final class CFMetaData
&& ObjectUtils.equals(other.comment, comment)
&& other.rowCacheSize == rowCacheSize
&& other.keyCacheSize == keyCacheSize
+ && other.readRepairChance == readRepairChance
&& other.cfId == cfId;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Fri Apr 2 14:05:32 2010
@@ -105,12 +105,6 @@ public class DatabaseDescriptor
private static int memtableThroughput = 64;
/* Number of objects in millions in the memtable before it is dumped */
private static double memtableOperations = 0.1;
- /*
- * This parameter enables or disables consistency checks.
- * If set to false the read repairs are disable for very
- * high throughput on reads but at the cost of consistency.
- */
- private static boolean doConsistencyCheck = true;
/* Job Jar Location */
private static String jobJarFileLocation;
/* Address where to run the job tracker */
@@ -429,13 +423,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("Memtable object count must
be a positive double");
}
- /* This parameter enables or disables consistency checks.
- * If set to false the read repairs are disable for very
- * high throughput on reads but at the cost of consistency.*/
- String doConsistency =
xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
- if ( doConsistency != null )
- doConsistencyCheck = Boolean.parseBoolean(doConsistency);
-
/* read the size at which we should do column indexes */
String columnIndexSize =
xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
if(columnIndexSize == null)
@@ -705,12 +692,22 @@ public class DatabaseDescriptor
rowCacheSize = FBUtilities.parseDoubleOrPercent(value);
}
+ double readRepairChance =
CFMetaData.DEFAULT_READ_REPAIR_CHANCE;
+ if ((value = XMLUtils.getAttributeValue(columnFamily,
"ReadRepairChance")) != null)
+ {
+ readRepairChance =
FBUtilities.parseDoubleOrPercent(value);
+ if (readRepairChance < 0.0 || readRepairChance > 1.0)
+ {
+ throw new ConfigurationException("ReadRepairChance
must be between 0.0 and 1.0");
+ }
+ }
+
// Parse out user-specified logical names for the various
dimensions
// of a the column family from the config.
String comment = xmlUtils.getNodeValue(xqlCF + "Comment");
// insert it into the table dictionary.
- cfDefs[j] = new CFMetaData(tableName, cfName, columnType,
comparator, subcolumnComparator, comment, rowCacheSize, keyCacheSize);
+ cfDefs[j] = new CFMetaData(tableName, cfName, columnType,
comparator, subcolumnComparator, comment, rowCacheSize, keyCacheSize,
readRepairChance);
}
KSMetaData meta = new KSMetaData(ksName, strategyClass,
replicationFactor, snitch, cfDefs);
@@ -878,11 +875,6 @@ public class DatabaseDescriptor
return memtableOperations;
}
- public static boolean getConsistencyCheck()
- {
- return doConsistencyCheck;
- }
-
public static String getClusterName()
{
return clusterName;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri
Apr 2 14:05:32 2010
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutExcep
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.cassandra.config.CFMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.ArrayUtils;
@@ -61,6 +62,7 @@ public class StorageProxy implements Sto
{
private static final Logger logger =
LoggerFactory.getLogger(StorageProxy.class);
+ private static final Random random = new Random();
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -357,7 +359,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("weakreadremote reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- if (DatabaseDescriptor.getConsistencyCheck())
+ if (randomlyReadRepair(command))
message.setHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
iars.add(MessagingService.instance.sendRR(message, endPoint));
}
@@ -491,7 +493,7 @@ public class StorageProxy implements Sto
}
catch (DigestMismatchException ex)
{
- if (DatabaseDescriptor.getConsistencyCheck())
+ if (randomlyReadRepair(command))
{
IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table,
DatabaseDescriptor.getQuorum(command.table));
QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(
@@ -697,6 +699,12 @@ public class StorageProxy implements Sto
}
return ranges;
}
+
+ private static boolean randomlyReadRepair(ReadCommand command)
+ {
+ CFMetaData cfmd =
DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
+ return cfmd.readRepairChance > random.nextDouble();
+ }
public long getReadOperations()
{
@@ -761,7 +769,7 @@ public class StorageProxy implements Sto
Row row = command.getRow(table);
// Do the consistency checks in the background
- if (DatabaseDescriptor.getConsistencyCheck())
+ if (randomlyReadRepair(command))
{
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
if (endpoints.size() > 1)