Repository: cassandra
Updated Branches:
  refs/heads/trunk 0f67b540e -> 7d266b9e7


Retry acquire MV lock on failure instead of throwing WTE on streaming

Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92

Branch: refs/heads/trunk
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <[email protected]>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <[email protected]>
Committed: Thu Dec 15 16:46:00 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 92 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  5 +-
 6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and 
hints (CASSANDRA-12905)
  * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
  * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
  * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME 
(CASSANDRA-13040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
 
     private static final String TEST_FAIL_WRITES_KS = 
System.getProperty("cassandra.test.fail_writes_ks", "");
     private static final boolean TEST_FAIL_WRITES = 
!TEST_FAIL_WRITES_KS.isEmpty();
-    private static int TEST_FAIL_MV_LOCKS_COUNT = 
Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", 
"0"), 0);
+    private static int TEST_FAIL_MV_LOCKS_COUNT = 
Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
 
     public final KeyspaceMetrics metric;
 
@@ -379,42 +379,40 @@ public class Keyspace
         }
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean 
writeCommitLog)
-    {
-        return apply(mutation, writeCommitLog, true, false, null);
-    }
-
-    /**
-     * Should be used if caller is blocking and runs in mutation stage.
-     * Otherwise there is a race condition where ALL mutation workers are 
beeing blocked ending
-     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
-     *
-     * @param mutation
-     * @param writeCommitLog
-     * @return
-     */
-    public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean 
writeCommitLog)
+    public CompletableFuture<?> applyFuture(Mutation mutation, boolean 
writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, true, false, false, null);
+        return apply(mutation, writeCommitLog, updateIndexes, true, true, 
null);
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean 
writeCommitLog, boolean updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean 
updateIndexes)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, false, null);
+        apply(mutation, writeCommitLog, updateIndexes, true);
     }
 
-    public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog)
     {
-        return apply(mutation, false, true, true, null);
+        apply(mutation, writeCommitLog, true, true);
     }
 
-    public CompletableFuture<?> apply(final Mutation mutation,
-                                      final boolean writeCommitLog,
-                                      boolean updateIndexes,
-                                      boolean isClReplay,
-                                      CompletableFuture<?> future)
+    /**
+     * If apply is blocking, apply must not be deferred
+     * Otherwise there is a race condition where ALL mutation workers are 
beeing blocked ending
+     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+     *
+     * @param mutation       the row to write.  Must not be modified after 
calling apply, since commitlog append
+     *                       may happen concurrently, depending on the CL 
Executor type.
+     * @param writeCommitLog false to disable commitlog append entirely
+     * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
+     * @param isDroppable    true if this should throw WriteTimeoutException 
if it does not acquire lock within write_request_timeout_in_ms
+     * @throws ExecutionException
+     */
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog,
+                      boolean updateIndexes,
+                      boolean isDroppable)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, isClReplay, 
true, future);
+        apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, 
null);
     }
 
     /**
@@ -424,13 +422,13 @@ public class Keyspace
      *                       may happen concurrently, depending on the CL 
Executor type.
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
-     * @param isClReplay     true if caller is the commitlog replayer
+     * @param isDroppable    true if this should throw WriteTimeoutException 
if it does not acquire lock within write_request_timeout_in_ms
      * @param isDeferrable   true if caller is not waiting for future to 
complete, so that future may be deferred
      */
-    public CompletableFuture<?> apply(final Mutation mutation,
+    private CompletableFuture<?> apply(final Mutation mutation,
                                       final boolean writeCommitLog,
                                       boolean updateIndexes,
-                                      boolean isClReplay,
+                                      boolean isDroppable,
                                       boolean isDeferrable,
                                       CompletableFuture<?> future)
     {
@@ -438,7 +436,11 @@ public class Keyspace
             throw new RuntimeException("Testing write failures");
 
         boolean requiresViewUpdate = updateIndexes && 
viewManager.updatesAffectView(Collections.singleton(mutation), false);
-        final CompletableFuture<?> mark = future == null ? new 
CompletableFuture<>() : future;
+
+        // If apply is not deferrable, no future is required, returns always 
null
+        if (isDeferrable && future == null) {
+            future = new CompletableFuture<>();
+        }
 
         Lock lock = null;
         if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
 
                 if (lock == null)
                 {
-                    // avoid throwing a WTE during commitlog replay
-                    if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                    //throw WTE only if request is droppable
+                    if (isDroppable && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
                     {
                         logger.trace("Could not acquire lock for {}", 
ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                         Tracing.trace("Could not acquire MV lock");
                         if (future != null)
                         {
                             future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
-                            return mark;
+                            return future;
                         }
                         else
                         {
@@ -472,11 +474,12 @@ public class Keyspace
                     {
                         //This view update can't happen right now. so rather 
than keep this thread busy
                         // we will re-apply ourself to the queue and try again 
later
+                        final CompletableFuture<?> mark = future;
                         StageManager.getStage(Stage.MUTATION).execute(() ->
-                                apply(mutation, writeCommitLog, true, 
isClReplay, mark)
+                                apply(mutation, writeCommitLog, true, 
isDroppable, true, mark)
                         );
 
-                        return mark;
+                        return future;
                     }
                     else
                     {
@@ -499,7 +502,9 @@ public class Keyspace
                 else
                 {
                     long acquireTime = System.currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
-                    if (!isClReplay)
+                    // Metrics are only collected for droppable write 
operations
+                    // Bulk non-droppable operations (e.g. commitlog replay, 
hint delivery) are not measured
+                    if (isDroppable)
                     {
                         for (UUID cfid : mutation.getColumnFamilyIds())
                             
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations 
from base table replica");
-                        
viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog 
&& !isClReplay, baseComplete);
+                        
viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, 
writeCommitLog, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -553,8 +558,11 @@ public class Keyspace
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }
-            mark.complete(null);
-            return mark;
+
+            if (future != null) {
+                future.complete(null);
+            }
+            return future;
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
     public CompletableFuture<?> applyFuture()
     {
         Keyspace ks = Keyspace.open(keyspaceName);
-        return ks.apply(this, 
Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+        return ks.applyFuture(this, 
Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
+    }
+
+    public void apply(boolean durableWrites, boolean isDroppable)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites, true, 
isDroppable);
     }
 
     public void apply(boolean durableWrites)
     {
-        try
-        {
-            Keyspace ks = Keyspace.open(keyspaceName);
-            Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, 
durableWrites));
-        }
-        catch (ExecutionException e)
-        {
-            throw Throwables.propagate(e.getCause());
-        }
+        apply(durableWrites, true);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
                 {
                     assert !newMutation.isEmpty();
 
-                    try
-                    {
-                        
Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw Throwables.propagate(e.getCause());
-                    }
-
+                    
Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, 
false);
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
-                try
-                {
-                    
Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation,
 true));
-                }
-                catch (ExecutionException e)
-                {
-                    throw Throwables.propagate(e.getCause());
-                }
+                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, 
true);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
                     {
                         for (SSTableReader reader : readers)
                         {
+                            Keyspace ks = 
Keyspace.open(reader.getKeyspaceName());
                             try (ISSTableScanner scanner = reader.getScanner())
                             {
                                 while (scanner.hasNext())
                                 {
                                     try (UnfilteredRowIterator rowIterator = 
scanner.next())
                                     {
-                                        //Apply unsafe (we will flush below 
before transaction is done)
-                                        new 
Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+                                        // MV *can* be applied unsafe as we 
flush below before transaction is done.
+                                        ks.apply(new 
Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
                                     }
                                 }
                             }

Reply via email to