Repository: cassandra
Updated Branches:
  refs/heads/trunk bb4c5c3c4 -> 6f647aaa0


Make it possible to monitor an ideal consistency level separate from actual 
consistency level

Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-13289


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f647aaa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f647aaa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f647aaa

Branch: refs/heads/trunk
Commit: 6f647aaa0df6f90ee298d372e624c9e3c1ae937e
Parents: bb4c5c3
Author: Ariel Weisberg <aweisb...@apple.com>
Authored: Thu Mar 2 16:46:13 2017 -0500
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Thu Mar 30 17:01:20 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   5 +
 .../org/apache/cassandra/config/Config.java     |   8 +
 .../cassandra/config/DatabaseDescriptor.java    |  11 +
 .../locator/AbstractReplicationStrategy.java    |  49 +++-
 .../cassandra/metrics/KeyspaceMetrics.java      |   9 +
 .../service/AbstractWriteResponseHandler.java   | 105 ++++++++-
 .../DatacenterSyncWriteResponseHandler.java     |  30 ++-
 .../service/DatacenterWriteResponseHandler.java |   8 +
 .../apache/cassandra/service/StorageProxy.java  |  26 ++-
 .../cassandra/service/StorageProxyMBean.java    |   5 +
 .../cassandra/service/WriteResponseHandler.java |   4 +
 .../config/DatabaseDescriptorRefTest.java       |   1 +
 .../service/WriteResponseHandlerTest.java       | 234 +++++++++++++++++++
 14 files changed, 474 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a164ee..d4b53d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Make it possible to monitor an ideal consistency level separate from actual 
consistency level (CASSANDRA-13289)
  * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
  * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
  * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d8392a0..f2c4c84 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1120,3 +1120,8 @@ back_pressure_strategy:
 
 # Do not try to coalesce messages if we already got that many messages. This 
should be more than 2 and less than 128.
 # otc_coalescing_enough_coalesced_messages: 8
+
+# Track a metric per keyspace indicating whether replication achieved the 
ideal consistency
+# level for writes without timing out. This is different from the consistency 
level requested by
+# each write which may be lower in order to facilitate availability.
+# ideal_consistency_level: EACH_QUORUM

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 36ce576..1461cd4 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+
 /**
  * A class that contains configuration properties for the cassandra node it 
runs within.
  *
@@ -271,6 +273,12 @@ public class Config
     public int tracetype_query_ttl = (int) TimeUnit.DAYS.toSeconds(1);
     public int tracetype_repair_ttl = (int) TimeUnit.DAYS.toSeconds(7);
 
+    /**
+     * Maintain statistics on whether writes achieve the ideal consistency 
level
+     * before expiring and becoming hints
+     */
+    public volatile ConsistencyLevel ideal_consistency_level = null;
+
     /*
      * Strategy to use for coalescing messages in OutboundTcpConnection.
      * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and 
leading/trailing

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 465cd8a..debf161 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
@@ -2269,4 +2270,14 @@ public class DatabaseDescriptor
     {
         return backPressureStrategy;
     }
+
+    public static ConsistencyLevel getIdealConsistencyLevel()
+    {
+        return conf.ideal_consistency_level;
+    }
+
+    public static void setIdealConsistencyLevel(ConsistencyLevel cl)
+    {
+        conf.ideal_consistency_level = cl;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 9c43486..c3498d9 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
@@ -40,6 +41,7 @@ import 
org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterWriteResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -135,16 +137,57 @@ public abstract class AbstractReplicationStrategy
                                                                        
WriteType writeType,
                                                                        long 
queryStartNanoTime)
     {
+        return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, callback, writeType, queryStartNanoTime, 
DatabaseDescriptor.getIdealConsistencyLevel());
+    }
+
+    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+                                                                       
Collection<InetAddress> pendingEndpoints,
+                                                                       
ConsistencyLevel consistency_level,
+                                                                       
Runnable callback,
+                                                                       
WriteType writeType,
+                                                                       long 
queryStartNanoTime,
+                                                                       
ConsistencyLevel idealConsistencyLevel)
+    {
+        AbstractWriteResponseHandler resultResponseHandler;
         if (consistency_level.isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, 
queryStartNanoTime);
+            resultResponseHandler = new 
DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, 
consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this 
instanceof NetworkTopologyStrategy))
         {
-            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, 
pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, 
queryStartNanoTime);
+            resultResponseHandler = new 
DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, 
consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
         }
-        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, 
consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+        else
+        {
+            resultResponseHandler = new 
WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, 
getKeyspace(), callback, writeType, queryStartNanoTime);
+        }
+
+        //Check if tracking the ideal consistency level is configured
+        if (idealConsistencyLevel != null)
+        {
+            //If ideal and requested are the same just use this handler to 
track the ideal consistency level
+            //This is also used so that the ideal consistency level handler 
when constructed knows it is the ideal
+            //one for tracking purposes
+            if (idealConsistencyLevel == consistency_level)
+            {
+                
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
+            }
+            else
+            {
+                //Construct a delegate response handler to use to track the 
ideal consistency level
+                AbstractWriteResponseHandler idealHandler = 
getWriteResponseHandler(naturalEndpoints,
+                                                                               
     pendingEndpoints,
+                                                                               
     idealConsistencyLevel,
+                                                                               
     callback,
+                                                                               
     writeType,
+                                                                               
     queryStartNanoTime,
+                                                                               
     idealConsistencyLevel);
+                resultResponseHandler.setIdealCLResponseHandler(idealHandler);
+            }
+        }
+
+        return resultResponseHandler;
     }
 
     private Keyspace getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 2e1c384..63f8dd0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -19,8 +19,10 @@ package org.apache.cassandra.metrics;
 
 import java.util.Set;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -91,6 +93,10 @@ public class KeyspaceMetrics
     public final LatencyMetrics casPropose;
     /** CAS Commit metrics */
     public final LatencyMetrics casCommit;
+    /** Writes failed ideal consistency **/
+    public final Counter writeFailedIdealCL;
+    /** Ideal CL write latency metrics */
+    public final LatencyMetrics idealCLWriteLatency;
 
     public final MetricNameFactory factory;
     private Keyspace keyspace;
@@ -236,6 +242,8 @@ public class KeyspaceMetrics
         casPrepare = new LatencyMetrics(factory, "CasPrepare");
         casPropose = new LatencyMetrics(factory, "CasPropose");
         casCommit = new LatencyMetrics(factory, "CasCommit");
+        writeFailedIdealCL = 
Metrics.counter(factory.createMetricName("WriteFailedIdealCL"));
+        idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite");
     }
 
     /**
@@ -251,6 +259,7 @@ public class KeyspaceMetrics
         readLatency.release();
         writeLatency.release();
         rangeLatency.release();
+        idealCLWriteLatency.release();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 8c30b89..b5eaadb 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import com.google.common.collect.Iterables;
@@ -40,8 +41,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler<T> implements 
IAsyncCallbackWithFailure<T>
 {
-    protected static final Logger logger = LoggerFactory.getLogger( 
AbstractWriteResponseHandler.class );
+    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractWriteResponseHandler.class);
 
+    //Count down until all responses and expirations have occured before 
deciding whether the ideal CL was reached.
+    private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
     protected final Collection<InetAddress> naturalEndpoints;
@@ -50,14 +53,22 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     protected final Collection<InetAddress> pendingEndpoints;
     protected final WriteType writeType;
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
-        = 
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
+    = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;
     private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
     private final long queryStartNanoTime;
     private volatile boolean supportsBackPressure = true;
 
     /**
-     * @param callback A callback to be called when the write is successful.
+      * Delegate to another WriteReponseHandler or possibly this one to track 
if the ideal consistency level was reached.
+      * Will be set to null if ideal CL was not configured
+      * Will be set to an AWRH delegate if ideal CL was configured
+      * Will be same as "this" if this AWRH is the ideal consistency level
+      */
+    private AbstractWriteResponseHandler idealCLDelegate;
+
+    /**
+     * @param callback           A callback to be called when the write is 
successful.
      * @param queryStartNanoTime
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
@@ -119,6 +130,64 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     }
 
     /**
+     * Set a delegate ideal CL write response handler. Note that this could be 
the same as this
+     * if the ideal CL and requested CL are the same.
+     */
+    public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
+    {
+        this.idealCLDelegate = handler;
+        idealCLDelegate.responsesAndExpirations = new 
AtomicInteger(naturalEndpoints.size() + pendingEndpoints.size());
+    }
+
+    /**
+     * This logs the response but doesn't do any further processing related to 
this write response handler
+     * on whether the CL was achieved. Only call this after the subclass has 
completed all it's processing
+     * since the subclass instance may be queried to find out if the CL was 
achieved.
+     */
+    protected final void logResponseToIdealCLDelegate(MessageIn<T> m)
+    {
+        //Tracking ideal CL was not configured
+        if (idealCLDelegate == null)
+        {
+            return;
+        }
+
+        if (idealCLDelegate == this)
+        {
+            //Processing of the message was already done since this is the 
handler for the
+            //ideal consistency level. Just decrement the counter.
+            decrementResponseOrExpired();
+        }
+        else
+        {
+            //Let the delegate do full processing, this will loop back into 
the branch above
+            //with idealCLDelegate == this, because the ideal write handler 
idealCLDelegate will always
+            //be set to this in the delegate.
+            idealCLDelegate.response(m);
+        }
+    }
+
+    public final void expired()
+    {
+        //Tracking ideal CL was not configured
+        if (idealCLDelegate == null)
+        {
+            return;
+        }
+
+        //The requested CL matched ideal CL so reuse this object
+        if (idealCLDelegate == this)
+        {
+            decrementResponseOrExpired();
+        }
+        else
+        {
+            //Have the delegate track the expired response
+            idealCLDelegate.decrementResponseOrExpired();
+        }
+    }
+
+    /**
      * @return the minimum number of endpoints that must reply.
      */
     protected int totalBlockFor()
@@ -149,7 +218,9 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      */
     protected abstract int ackCount();
 
-    /** null message means "response from local write" */
+    /**
+     * null message means "response from local write"
+     */
     public abstract void response(MessageIn<T> msg);
 
     public void assureSufficientLiveNodes() throws UnavailableException
@@ -170,8 +241,8 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
         logger.trace("Got failure from {}", from);
 
         int n = waitingFor(from)
-              ? failuresUpdater.incrementAndGet(this)
-              : failures;
+                ? failuresUpdater.incrementAndGet(this)
+                : failures;
 
         failureReasonByEndpoint.put(from, failureReason);
 
@@ -189,4 +260,26 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     {
         this.supportsBackPressure = supportsBackPressure;
     }
+
+    /**
+     * Decrement the counter for all responses/expirations and if the counter
+     * hits 0 check to see if the ideal consistency level (this write response 
handler)
+     * was reached using the signal.
+     */
+    private final void decrementResponseOrExpired()
+    {
+        int decrementedValue = responsesAndExpirations.decrementAndGet();
+        if (decrementedValue == 0)
+        {
+            //The condition being signaled is a valid proxy for the CL being 
achieved
+            if (!condition.isSignaled())
+            {
+                keyspace.metric.writeFailedIdealCL.inc();
+            }
+            else
+            {
+                keyspace.metric.idealCLWriteLatency.addNano(System.nanoTime() 
- queryStartNanoTime);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 9584611..4137e3a 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -71,21 +71,29 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
 
     public void response(MessageIn<T> message)
     {
-        String dataCenter = message == null
-                            ? DatabaseDescriptor.getLocalDataCenter()
-                            : snitch.getDatacenter(message.from);
+        try
+        {
+            String dataCenter = message == null
+                                ? DatabaseDescriptor.getLocalDataCenter()
+                                : snitch.getDatacenter(message.from);
+
+            responses.get(dataCenter).getAndDecrement();
+            acks.incrementAndGet();
 
-        responses.get(dataCenter).getAndDecrement();
-        acks.incrementAndGet();
+            for (AtomicInteger i : responses.values())
+            {
+                if (i.get() > 0)
+                    return;
+            }
 
-        for (AtomicInteger i : responses.values())
+            // all the quorum conditions are met
+            signal();
+        }
+        finally
         {
-            if (i.get() > 0)
-                return;
+            //Must be last after all subclass processing
+            logResponseToIdealCLDelegate(message);
         }
-
-        // all the quorum conditions are met
-        signal();
     }
 
     protected int ackCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 2309e87..83dddcf 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -46,7 +46,15 @@ public class DatacenterWriteResponseHandler<T> extends 
WriteResponseHandler<T>
     public void response(MessageIn<T> message)
     {
         if (message == null || waitingFor(message.from))
+        {
             super.response(message);
+        }
+        else
+        {
+            //WriteResponseHandler.response will call 
logResonseToIdealCLDelegate so only do it if not calling 
WriteResponseHandler.response.
+            //Must be last after all subclass processing
+            logResponseToIdealCLDelegate(message);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0585717..6be5286 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -561,9 +561,16 @@ public class StorageProxy implements StorageProxyMBean
                     MessagingService.instance().sendOneWay(message, 
destination);
                 }
             }
-            else if (shouldHint)
+            else
             {
-                submitHint(proposal.makeMutation(), destination, null);
+                if (responseHandler != null)
+                {
+                    responseHandler.expired();
+                }
+                if (shouldHint)
+                {
+                    submitHint(proposal.makeMutation(), destination, null);
+                }
             }
         }
 
@@ -1257,6 +1264,8 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
+                //Immediately mark the response as expired since the request 
will not be sent
+                responseHandler.expired();
                 if (shouldHint(destination))
                 {
                     if (endpointsToHint == null)
@@ -2774,4 +2783,17 @@ public class StorageProxy implements StorageProxyMBean
     {
         return Schema.instance.getNumberOfTables();
     }
+
+    public String getIdealConsistencyLevel()
+    {
+        return DatabaseDescriptor.getIdealConsistencyLevel().toString();
+    }
+
+    public String setIdealConsistencyLevel(String cl)
+    {
+        ConsistencyLevel original = 
DatabaseDescriptor.getIdealConsistencyLevel();
+        ConsistencyLevel newCL = 
ConsistencyLevel.valueOf(cl.trim().toUpperCase());
+        DatabaseDescriptor.setIdealConsistencyLevel(newCL);
+        return String.format("Updating ideal consistency level new value: %s 
old value %s", newCL, original.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 0a4ba19..97f7615 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+
 public interface StorageProxyMBean
 {
     public long getTotalHints();
@@ -63,4 +65,7 @@ public interface StorageProxyMBean
     public Map<String, List<String>> getSchemaVersions();
 
     public int getNumberOfTables();
+
+    public String getIdealConsistencyLevel();
+    public String setIdealConsistencyLevel(String cl);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 46e4e93..55ca5aa 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -68,6 +68,10 @@ public class WriteResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
     {
         if (responsesUpdater.decrementAndGet(this) == 0)
             signal();
+        //Must be last after all subclass processing
+        //The two current subclasses both assume logResponseToIdealCLDelegate 
is called
+        //here.
+        logResponseToIdealCLDelegate(m);
     }
 
     protected int ackCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index c8f8bc1..b915854 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -81,6 +81,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1",
     "org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor",
     "org.apache.cassandra.config.TransparentDataEncryptionOptions",
+    "org.apache.cassandra.db.ConsistencyLevel",
     "org.apache.cassandra.dht.IPartitioner",
     "org.apache.cassandra.exceptions.ConfigurationException",
     "org.apache.cassandra.exceptions.RequestValidationException",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
new file mode 100644
index 0000000..815dbf6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class WriteResponseHandlerTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+    static List<InetAddress> targets;
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        // Register peers with expected DC for NetworkTopologyStrategy.
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        metadata.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.1.0.255"));
+        metadata.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.2.0.255"));
+
+        DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch()
+        {
+            public String getRack(InetAddress endpoint)
+            {
+                return null;
+            }
+
+            public String getDatacenter(InetAddress endpoint)
+            {
+                byte[] address = endpoint.getAddress();
+                if (address[1] == 1)
+                    return "datacenter1";
+                else
+                    return "datacenter2";
+            }
+
+            public List<InetAddress> getSortedListByProximity(InetAddress 
address, Collection<InetAddress> unsortedAddress)
+            {
+                return null;
+            }
+
+            public void sortByProximity(InetAddress address, List<InetAddress> 
addresses)
+            {
+
+            }
+
+            public int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2)
+            {
+                return 0;
+            }
+
+            public void gossiperStarting()
+            {
+
+            }
+
+            public boolean isWorthMergingForRangeQuery(List<InetAddress> 
merged, List<InetAddress> l1, List<InetAddress> l2)
+            {
+                return false;
+            }
+        });
+        
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.1.0.1"));
+        SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 
3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar"));
+        ks = Keyspace.open("Foo");
+        cfs = ks.getColumnFamilyStore("Bar");
+        targets = ImmutableList.of(InetAddress.getByName("127.1.0.255"), 
InetAddress.getByName("127.1.0.254"), InetAddress.getByName("127.1.0.253"),
+                                   InetAddress.getByName("127.2.0.255"), 
InetAddress.getByName("127.2.0.254"), InetAddress.getByName("127.2.0.253"));
+    }
+
+
+    @Before
+    public void resetCounters()
+    {
+        
ks.metric.writeFailedIdealCL.dec(ks.metric.writeFailedIdealCL.getCount());
+    }
+
+    /**
+     * Validate that failing to achieve ideal CL increments the failure counter
+     * @throws Throwable
+     */
+    @Test
+    public void failedIdealCLIncrementsStat() throws Throwable
+    {
+        AbstractWriteResponseHandler awr = 
createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, 
ConsistencyLevel.EACH_QUORUM);
+
+        //Succeed in local DC
+        awr.response(createDummyMessage(0));
+        awr.response(createDummyMessage(1));
+        awr.response(createDummyMessage(2));
+
+        //Fail in remote DC
+        awr.expired();
+        awr.expired();
+        awr.expired();
+        assertEquals(1, ks.metric.writeFailedIdealCL.getCount());
+        assertEquals(0, ks.metric.idealCLWriteLatency.totalLatency.getCount());
+    }
+
+    /**
+     * Validate that a successful write at ideal CL logs latency information. 
Also validates
+     * DatacenterSyncWriteResponseHandler
+     * @throws Throwable
+     */
+    @Test
+    public void idealCLLatencyTracked() throws Throwable
+    {
+        long startingCount = ks.metric.idealCLWriteLatency.latency.getCount();
+        //Specify query start time in past to ensure minimum latency 
measurement
+        AbstractWriteResponseHandler awr = 
createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, 
ConsistencyLevel.EACH_QUORUM, System.nanoTime() - TimeUnit.DAYS.toNanos(1));
+
+        //dc1
+        awr.response(createDummyMessage(0));
+        awr.response(createDummyMessage(1));
+        //dc2
+        awr.response(createDummyMessage(4));
+        awr.response(createDummyMessage(5));
+
+        //Don't need the others
+        awr.expired();
+        awr.expired();
+
+        assertEquals(0,  ks.metric.writeFailedIdealCL.getCount());
+        assertTrue( TimeUnit.DAYS.toMicros(1) < 
ks.metric.idealCLWriteLatency.totalLatency.getCount());
+        assertEquals(startingCount + 1, 
ks.metric.idealCLWriteLatency.latency.getCount());
+    }
+
+    /**
+     * Validate that WriteResponseHandler does the right thing on success.
+     * @throws Throwable
+     */
+    @Test
+    public void idealCLWriteResponeHandlerWorks() throws Throwable
+    {
+        long startingCount = ks.metric.idealCLWriteLatency.latency.getCount();
+        AbstractWriteResponseHandler awr = 
createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.ALL);
+
+        //dc1
+        awr.response(createDummyMessage(0));
+        awr.response(createDummyMessage(1));
+        awr.response(createDummyMessage(2));
+        //dc2
+        awr.response(createDummyMessage(3));
+        awr.response(createDummyMessage(4));
+        awr.response(createDummyMessage(5));
+
+        assertEquals(0,  ks.metric.writeFailedIdealCL.getCount());
+        assertEquals(startingCount + 1, 
ks.metric.idealCLWriteLatency.latency.getCount());
+    }
+
+    /**
+     * Validate that DatacenterWriteResponseHandler does the right thing on 
success.
+     * @throws Throwable
+     */
+    @Test
+    public void idealCLDatacenterWriteResponeHandlerWorks() throws Throwable
+    {
+        long startingCount = ks.metric.idealCLWriteLatency.latency.getCount();
+        AbstractWriteResponseHandler awr = 
createWriteResponseHandler(ConsistencyLevel.ONE, ConsistencyLevel.LOCAL_QUORUM);
+
+        //dc1
+        awr.response(createDummyMessage(0));
+        awr.response(createDummyMessage(1));
+        awr.response(createDummyMessage(2));
+        //dc2
+        awr.response(createDummyMessage(3));
+        awr.response(createDummyMessage(4));
+        awr.response(createDummyMessage(5));
+
+        assertEquals(0,  ks.metric.writeFailedIdealCL.getCount());
+        assertEquals(startingCount + 1, 
ks.metric.idealCLWriteLatency.latency.getCount());
+    }
+
+    private static AbstractWriteResponseHandler 
createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal)
+    {
+        return createWriteResponseHandler(cl, ideal, System.nanoTime());
+    }
+
+    private static AbstractWriteResponseHandler 
createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long 
queryStartTime)
+    {
+        return ks.getReplicationStrategy().getWriteResponseHandler(targets, 
ImmutableList.of(), cl, new Runnable() {
+            public void run()
+            {
+
+            }
+        }, WriteType.SIMPLE, queryStartTime, ideal);
+    }
+
+    private static MessageIn createDummyMessage(int target)
+    {
+        return MessageIn.create(targets.get(target), null, null,  null, 0, 0L);
+    }
+}

Reply via email to