Repository: cassandra
Updated Branches:
  refs/heads/trunk 40aeaf0c1 -> d31ed0f51


Abstract write path for pluggable storage

Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-14118


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d31ed0f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d31ed0f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d31ed0f5

Branch: refs/heads/trunk
Commit: d31ed0f51b23d8fce892695ecd82d4f843f31f4c
Parents: 40aeaf0
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Tue Mar 20 17:36:30 2018 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Sun Apr 15 17:39:58 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/CassandraKeyspaceWriteHandler.java       | 92 ++++++++++++++++++++
 .../db/CassandraTableWriteHandler.java          | 42 +++++++++
 .../cassandra/db/CassandraWriteContext.java     | 59 +++++++++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  7 ++
 src/java/org/apache/cassandra/db/Keyspace.java  | 41 +++++----
 .../cassandra/db/KeyspaceWriteHandler.java      | 29 ++++++
 .../cassandra/db/ReadExecutionController.java   | 23 ++---
 .../apache/cassandra/db/TableWriteHandler.java  | 27 ++++++
 .../org/apache/cassandra/db/WriteContext.java   | 31 +++++++
 src/java/org/apache/cassandra/index/Index.java  |  4 +-
 .../cassandra/index/SecondaryIndexManager.java  | 30 ++++---
 .../index/internal/CassandraIndex.java          | 32 +++----
 .../internal/composites/CompositesSearcher.java | 12 +--
 .../index/internal/keys/KeysSearcher.java       |  7 +-
 .../apache/cassandra/index/sasi/SASIIndex.java  |  4 +-
 .../apache/cassandra/index/CustomIndexTest.java |  5 +-
 .../org/apache/cassandra/index/StubIndex.java   |  2 +-
 .../index/internal/CustomCassandraIndex.java    | 32 +++----
 19 files changed, 387 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0935434..5605a77 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Abstract write path for pluggable storage (CASSANDRA-14118)
  * nodetool describecluster should be more informative (CASSANDRA-13853)
  * Compaction performance improvements (CASSANDRA-14261) 
  * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java 
b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java
new file mode 100644
index 0000000..1f1bcdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class CassandraKeyspaceWriteHandler implements KeyspaceWriteHandler
+{
+    private final Keyspace keyspace;
+
+    public CassandraKeyspaceWriteHandler(Keyspace keyspace)
+    {
+        this.keyspace = keyspace;
+    }
+
+    @Override
+    public WriteContext beginWrite(Mutation mutation, boolean makeDurable) 
throws RequestExecutionException
+    {
+        OpOrder.Group group = null;
+        try
+        {
+            group = Keyspace.writeOrder.start();
+
+            // write the mutation to the commitlog and memtables
+            CommitLogPosition position = null;
+            if (makeDurable)
+            {
+                Tracing.trace("Appending to commitlog");
+                position = CommitLog.instance.add(mutation);
+            }
+            return new CassandraWriteContext(group, position);
+        }
+        catch (Throwable t)
+        {
+            if (group != null)
+            {
+                group.close();
+            }
+            throw t;
+        }
+    }
+
+    private WriteContext createEmptyContext()
+    {
+        OpOrder.Group group = null;
+        try
+        {
+            group = Keyspace.writeOrder.start();
+            return new CassandraWriteContext(group, null);
+        }
+        catch (Throwable t)
+        {
+            if (group != null)
+            {
+                group.close();
+            }
+            throw t;
+        }
+    }
+
+    @Override
+    public WriteContext createContextForIndexing()
+    {
+        return createEmptyContext();
+    }
+
+    @Override
+    public WriteContext createContextForRead()
+    {
+        return createEmptyContext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java 
b/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java
new file mode 100644
index 0000000..146539c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.tracing.Tracing;
+
+public class CassandraTableWriteHandler implements TableWriteHandler
+{
+    private final ColumnFamilyStore cfs;
+
+    public CassandraTableWriteHandler(ColumnFamilyStore cfs)
+    {
+        this.cfs = cfs;
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    public void write(PartitionUpdate update, WriteContext context, 
UpdateTransaction updateTransaction)
+    {
+        CassandraWriteContext ctx = CassandraWriteContext.fromContext(context);
+        Tracing.trace("Adding to {} memtable", update.metadata().name);
+        cfs.apply(update, updateTransaction, ctx.getGroup(), 
ctx.getPosition());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraWriteContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CassandraWriteContext.java 
b/src/java/org/apache/cassandra/db/CassandraWriteContext.java
new file mode 100644
index 0000000..bac1351
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CassandraWriteContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class CassandraWriteContext implements WriteContext
+{
+    private final OpOrder.Group opGroup;
+    private final CommitLogPosition position;
+
+    public CassandraWriteContext(OpOrder.Group opGroup, CommitLogPosition 
position)
+    {
+        Preconditions.checkArgument(opGroup != null);
+        this.opGroup = opGroup;
+        this.position = position;
+    }
+
+    public static CassandraWriteContext fromContext(WriteContext context)
+    {
+        Preconditions.checkArgument(context instanceof CassandraWriteContext);
+        return (CassandraWriteContext) context;
+    }
+
+    public OpOrder.Group getGroup()
+    {
+        return opGroup;
+    }
+
+    public CommitLogPosition getPosition()
+    {
+        return position;
+    }
+
+    @Override
+    public void close()
+    {
+        opGroup.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bfab6ea..50db418 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -216,6 +216,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public final TableMetrics metric;
     public volatile long sampleLatencyNanos;
 
+    private final CassandraTableWriteHandler writeHandler;
     private final CassandraStreamManager streamManager;
 
     private final TableRepairManager repairManager;
@@ -450,6 +451,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             mbeanName = null;
             oldMBeanName= null;
         }
+        writeHandler = new CassandraTableWriteHandler(this);
         streamManager = new CassandraStreamManager(this);
         repairManager = new CassandraTableRepairManager(this);
     }
@@ -466,6 +468,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    public TableWriteHandler getWriteHandler()
+    {
+        return writeHandler;
+    }
+
     public TableStreamManager getStreamManager()
     {
         return streamManager;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 42d43b2..651d156 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -32,8 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -91,6 +89,7 @@ public class Keyspace
 
     private volatile AbstractReplicationStrategy replicationStrategy;
     public final ViewManager viewManager;
+    private final KeyspaceWriteHandler writeHandler;
     private volatile ReplicationParams replicationParams;
     private final KeyspaceRepairManager repairManager;
 
@@ -338,7 +337,9 @@ public class Keyspace
             initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables);
         }
         this.viewManager.reload(false);
+
         this.repairManager = new CassandraKeyspaceRepairManager(this);
+        this.writeHandler = new CassandraKeyspaceWriteHandler(this);
     }
 
     private Keyspace(KeyspaceMetadata metadata)
@@ -348,6 +349,7 @@ public class Keyspace
         this.metric = new KeyspaceMetrics(this);
         this.viewManager = new ViewManager(this);
         this.repairManager = new CassandraKeyspaceRepairManager(this);
+        this.writeHandler = new CassandraKeyspaceWriteHandler(this);
     }
 
     public KeyspaceRepairManager getRepairManager()
@@ -424,6 +426,11 @@ public class Keyspace
         }
     }
 
+    public KeyspaceWriteHandler getWriteHandler()
+    {
+        return writeHandler;
+    }
+
     /**
      * adds a cf to internal structures, ends up creating disk files).
      */
@@ -479,16 +486,16 @@ public class Keyspace
      *
      * @param mutation       the row to write.  Must not be modified after 
calling apply, since commitlog append
      *                       may happen concurrently, depending on the CL 
Executor type.
-     * @param writeCommitLog false to disable commitlog append entirely
+     * @param makeDurable    if true, don't return unless write has been made 
durable
      * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
      * @param isDroppable    true if this should throw WriteTimeoutException 
if it does not acquire lock within write_request_timeout_in_ms
      */
     public void apply(final Mutation mutation,
-                      final boolean writeCommitLog,
+                      final boolean makeDurable,
                       boolean updateIndexes,
                       boolean isDroppable)
     {
-        applyInternal(mutation, writeCommitLog, updateIndexes, isDroppable, 
false, null);
+        applyInternal(mutation, makeDurable, updateIndexes, isDroppable, 
false, null);
     }
 
     /**
@@ -496,13 +503,13 @@ public class Keyspace
      *
      * @param mutation       the row to write.  Must not be modified after 
calling apply, since commitlog append
      *                       may happen concurrently, depending on the CL 
Executor type.
-     * @param writeCommitLog false to disable commitlog append entirely
+     * @param makeDurable    if true, don't return unless write has been made 
durable
      * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
      * @param isDroppable    true if this should throw WriteTimeoutException 
if it does not acquire lock within write_request_timeout_in_ms
      * @param isDeferrable   true if caller is not waiting for future to 
complete, so that future may be deferred
      */
     private CompletableFuture<?> applyInternal(final Mutation mutation,
-                                               final boolean writeCommitLog,
+                                               final boolean makeDurable,
                                                boolean updateIndexes,
                                                boolean isDroppable,
                                                boolean isDeferrable,
@@ -564,7 +571,7 @@ public class Keyspace
                             // we will re-apply ourself to the queue and try 
again later
                             final CompletableFuture<?> mark = future;
                             StageManager.getStage(Stage.MUTATION).execute(() ->
-                                                                          
applyInternal(mutation, writeCommitLog, true, isDroppable, true, mark)
+                                                                          
applyInternal(mutation, makeDurable, true, isDroppable, true, mark)
                             );
                             return future;
                         }
@@ -604,16 +611,8 @@ public class Keyspace
             }
         }
         int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group opGroup = writeOrder.start())
+        try (WriteContext ctx = getWriteHandler().beginWrite(mutation, 
makeDurable))
         {
-            // write the mutation to the commitlog and memtables
-            CommitLogPosition commitLogPosition = null;
-            if (writeCommitLog)
-            {
-                Tracing.trace("Appending to commitlog");
-                commitLogPosition = CommitLog.instance.add(mutation);
-            }
-
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
                 ColumnFamilyStore cfs = 
columnFamilyStores.get(upd.metadata().id);
@@ -629,7 +628,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations 
from base table replica");
-                        
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, 
writeCommitLog, baseComplete);
+                        
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, 
makeDurable, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -640,11 +639,11 @@ public class Keyspace
                     }
                 }
 
-                Tracing.trace("Adding to {} memtable", upd.metadata().name);
                 UpdateTransaction indexTransaction = updateIndexes
-                                                     ? 
cfs.indexManager.newUpdateTransaction(upd, opGroup, nowInSec)
+                                                     ? 
cfs.indexManager.newUpdateTransaction(upd, ctx, nowInSec)
                                                      : UpdateTransaction.NO_OP;
-                cfs.apply(upd, indexTransaction, opGroup, commitLogPosition);
+                cfs.getWriteHandler().write(upd, ctx, indexTransaction);
+
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java 
b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java
new file mode 100644
index 0000000..19cca72
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
+public interface KeyspaceWriteHandler
+{
+    // mutation can be null if makeDurable is false
+    WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws 
RequestExecutionException;
+    WriteContext createContextForIndexing();
+    WriteContext createContextForRead();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/ReadExecutionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java 
b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 9114212..537e3f8 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -29,9 +29,9 @@ public class ReadExecutionController implements AutoCloseable
 
     // For index reads
     private final ReadExecutionController indexController;
-    private final OpOrder.Group writeOp;
+    private final WriteContext writeContext;
 
-    private ReadExecutionController(OpOrder.Group baseOp, TableMetadata 
baseMetadata, ReadExecutionController indexController, OpOrder.Group writeOp)
+    private ReadExecutionController(OpOrder.Group baseOp, TableMetadata 
baseMetadata, ReadExecutionController indexController, WriteContext 
writeContext)
     {
         // We can have baseOp == null, but only when empty() is called, in 
which case the controller will never really be used
         // (which validForReadOn should ensure). But if it's not null, we 
should have the proper metadata too.
@@ -39,7 +39,7 @@ public class ReadExecutionController implements AutoCloseable
         this.baseOp = baseOp;
         this.baseMetadata = baseMetadata;
         this.indexController = indexController;
-        this.writeOp = writeOp;
+        this.writeContext = writeContext;
     }
 
     public ReadExecutionController indexReadController()
@@ -47,9 +47,9 @@ public class ReadExecutionController implements AutoCloseable
         return indexController;
     }
 
-    public OpOrder.Group writeOpOrderGroup()
+    public WriteContext getWriteContext()
     {
-        return writeOp;
+        return writeContext;
     }
 
     public boolean validForReadOn(ColumnFamilyStore cfs)
@@ -83,7 +83,8 @@ public class ReadExecutionController implements AutoCloseable
         }
         else
         {
-            OpOrder.Group baseOp = null, writeOp = null;
+            OpOrder.Group baseOp = null;
+            WriteContext writeContext = null;
             ReadExecutionController indexController = null;
             // OpOrder.start() shouldn't fail, but better safe than sorry.
             try
@@ -92,13 +93,13 @@ public class ReadExecutionController implements 
AutoCloseable
                 indexController = new 
ReadExecutionController(indexCfs.readOrdering.start(), indexCfs.metadata(), 
null, null);
                 // TODO: this should perhaps not open and maintain a writeOp 
for the full duration, but instead only *try* to delete stale entries, without 
blocking if there's no room
                 // as it stands, we open a writeOp and keep it open for the 
duration to ensure that should this CF get flushed to make room we don't block 
the reclamation of any room being made
-                writeOp = Keyspace.writeOrder.start();
-                return new ReadExecutionController(baseOp, baseCfs.metadata(), 
indexController, writeOp);
+                writeContext = 
baseCfs.keyspace.getWriteHandler().createContextForRead();
+                return new ReadExecutionController(baseOp, baseCfs.metadata(), 
indexController, writeContext);
             }
             catch (RuntimeException e)
             {
-                // Note that must have writeOp == null since ReadOrderGroup 
ctor can't fail
-                assert writeOp == null;
+                // Note that must have writeContext == null since 
ReadOrderGroup ctor can't fail
+                assert writeContext == null;
                 try
                 {
                     if (baseOp != null)
@@ -142,7 +143,7 @@ public class ReadExecutionController implements 
AutoCloseable
                 }
                 finally
                 {
-                    writeOp.close();
+                    writeContext.close();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/TableWriteHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TableWriteHandler.java 
b/src/java/org/apache/cassandra/db/TableWriteHandler.java
new file mode 100644
index 0000000..4e47221
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/TableWriteHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+
+public interface TableWriteHandler
+{
+    void write(PartitionUpdate update, WriteContext context, UpdateTransaction 
updateTransaction);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/WriteContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteContext.java 
b/src/java/org/apache/cassandra/db/WriteContext.java
new file mode 100644
index 0000000..102ab50
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/WriteContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+/**
+ * Issued by the keyspace write handler and used in the write path (as 
expected), as well as the read path
+ * and some async index building code. In the read and index paths, the write 
context is intended to be used
+ * as a marker for ordering operations. Reads can also end up performing 
writes in some cases, particularly
+ * when correcting secondary indexes.
+ */
+public interface WriteContext extends AutoCloseable
+{
+    @Override
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 1b4573d..adfe08c 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -377,7 +377,7 @@ public interface Index
      * This can be empty as an update might only contain partition, range and 
row deletions, but
      * the indexer is guaranteed to not get any cells for a column that is not 
part of {@code columns}.
      * @param nowInSec current time of the update operation
-     * @param opGroup operation group spanning the update operation
+     * @param ctx WriteContext spanning the update operation
      * @param transactionType indicates what kind of update is being performed 
on the base data
      *                        i.e. a write time insert/update/delete or the 
result of compaction
      * @return the newly created indexer or {@code null} if the index is not 
interested by the update
@@ -387,7 +387,7 @@ public interface Index
     public Indexer indexerFor(DecoratedKey key,
                               RegularAndStaticColumns columns,
                               int nowInSec,
-                              OpOrder.Group opGroup,
+                              WriteContext ctx,
                               IndexTransaction.Type transactionType);
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index b60d811..70aebbd 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -164,10 +164,12 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
      * The underlying column family containing the source data for these 
indexes
      */
     public final ColumnFamilyStore baseCfs;
+    private final Keyspace keyspace;
 
     public SecondaryIndexManager(ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
+        this.keyspace = baseCfs.keyspace;
         baseCfs.getTracker().subscribe(this);
     }
 
@@ -842,7 +844,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             while (!pager.isExhausted())
             {
                 try (ReadExecutionController controller = 
cmd.executionController();
-                     OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+                     WriteContext ctx = 
keyspace.getWriteHandler().createContextForIndexing();
                      UnfilteredPartitionIterator page = 
pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
                 {
                     if (!page.hasNext())
@@ -854,7 +856,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                                                              .map(index -> 
index.indexerFor(key,
                                                                                
             partition.columns(),
                                                                                
             nowInSec,
-                                                                               
             writeGroup,
+                                                                               
             ctx,
                                                                                
             IndexTransaction.Type.UPDATE))
                                                              
.filter(Objects::nonNull)
                                                              
.collect(Collectors.toSet());
@@ -1115,7 +1117,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     /**
      * Transaction for updates on the write path.
      */
-    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, 
OpOrder.Group opGroup, int nowInSec)
+    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, 
WriteContext ctx, int nowInSec)
     {
         if (!hasIndexes())
             return UpdateTransaction.NO_OP;
@@ -1124,7 +1126,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                                           .map(i -> 
i.indexerFor(update.partitionKey(),
                                                                  
update.columns(),
                                                                  nowInSec,
-                                                                 opGroup,
+                                                                 ctx,
                                                                  
IndexTransaction.Type.UPDATE))
                                           .filter(Objects::nonNull)
                                           .toArray(Index.Indexer[]::new);
@@ -1141,7 +1143,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                                                           int nowInSec)
     {
         // the check for whether there are any registered indexes is already 
done in CompactionIterator
-        return new IndexGCTransaction(key, regularAndStaticColumns, versions, 
nowInSec, listIndexes());
+        return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, 
versions, nowInSec, listIndexes());
     }
 
     /**
@@ -1154,7 +1156,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
         if (!hasIndexes())
             return CleanupTransaction.NO_OP;
 
-        return new CleanupGCTransaction(key, regularAndStaticColumns, 
nowInSec, listIndexes());
+        return new CleanupGCTransaction(key, regularAndStaticColumns, 
keyspace, nowInSec, listIndexes());
     }
 
     /**
@@ -1267,6 +1269,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     {
         private final DecoratedKey key;
         private final RegularAndStaticColumns columns;
+        private final Keyspace keyspace;
         private final int versions;
         private final int nowInSec;
         private final Collection<Index> indexes;
@@ -1275,12 +1278,13 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
 
         private IndexGCTransaction(DecoratedKey key,
                                    RegularAndStaticColumns columns,
-                                   int versions,
+                                   Keyspace keyspace, int versions,
                                    int nowInSec,
                                    Collection<Index> indexes)
         {
             this.key = key;
             this.columns = columns;
+            this.keyspace = keyspace;
             this.versions = versions;
             this.indexes = indexes;
             this.nowInSec = nowInSec;
@@ -1342,11 +1346,11 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             if (rows == null)
                 return;
 
-            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+            try (WriteContext ctx = 
keyspace.getWriteHandler().createContextForIndexing())
             {
                 for (Index index : indexes)
                 {
-                    Index.Indexer indexer = index.indexerFor(key, columns, 
nowInSec, opGroup, Type.COMPACTION);
+                    Index.Indexer indexer = index.indexerFor(key, columns, 
nowInSec, ctx, Type.COMPACTION);
                     if (indexer == null)
                         continue;
 
@@ -1370,6 +1374,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     {
         private final DecoratedKey key;
         private final RegularAndStaticColumns columns;
+        private final Keyspace keyspace;
         private final int nowInSec;
         private final Collection<Index> indexes;
 
@@ -1378,11 +1383,12 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
 
         private CleanupGCTransaction(DecoratedKey key,
                                      RegularAndStaticColumns columns,
-                                     int nowInSec,
+                                     Keyspace keyspace, int nowInSec,
                                      Collection<Index> indexes)
         {
             this.key = key;
             this.columns = columns;
+            this.keyspace = keyspace;
             this.indexes = indexes;
             this.nowInSec = nowInSec;
         }
@@ -1406,11 +1412,11 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             if (row == null && partitionDelete == null)
                 return;
 
-            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+            try (WriteContext ctx = 
keyspace.getWriteHandler().createContextForIndexing())
             {
                 for (Index index : indexes)
                 {
-                    Index.Indexer indexer = index.indexerFor(key, columns, 
nowInSec, opGroup, Type.CLEANUP);
+                    Index.Indexer indexer = index.indexerFor(key, columns, 
nowInSec, ctx, Type.CLEANUP);
                     if (indexer == null)
                         continue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java 
b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index c7f3536..9a29c02 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -340,7 +340,7 @@ public abstract class CassandraIndex implements Index
     public Indexer indexerFor(final DecoratedKey key,
                               final RegularAndStaticColumns columns,
                               final int nowInSec,
-                              final OpOrder.Group opGroup,
+                              final WriteContext ctx,
                               final IndexTransaction.Type transactionType)
     {
         /**
@@ -445,7 +445,7 @@ public abstract class CassandraIndex implements Index
                        clustering,
                        cell,
                        LivenessInfo.withExpirationTime(cell.timestamp(), 
cell.ttl(), cell.localDeletionTime()),
-                       opGroup);
+                       ctx);
             }
 
             private void removeCells(Clustering clustering, Iterable<Cell> 
cells)
@@ -462,7 +462,7 @@ public abstract class CassandraIndex implements Index
                 if (cell == null || !cell.isLive(nowInSec))
                     return;
 
-                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+                delete(key.getKey(), clustering, cell, ctx, nowInSec);
             }
 
             private void indexPrimaryKey(final Clustering clustering,
@@ -470,10 +470,10 @@ public abstract class CassandraIndex implements Index
                                          final Row.Deletion deletion)
             {
                 if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
-                    insert(key.getKey(), clustering, null, liveness, opGroup);
+                    insert(key.getKey(), clustering, null, liveness, ctx);
 
                 if (!deletion.isLive())
-                    delete(key.getKey(), clustering, deletion.time(), opGroup);
+                    delete(key.getKey(), clustering, deletion.time(), ctx);
             }
 
             private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
@@ -503,14 +503,14 @@ public abstract class CassandraIndex implements Index
      * @param indexKey the partition key in the index table
      * @param indexClustering the clustering in the index table
      * @param deletion deletion timestamp etc
-     * @param opGroup the operation under which to perform the deletion
+     * @param ctx the write context under which to perform the deletion
      */
     public void deleteStaleEntry(DecoratedKey indexKey,
                                  Clustering indexClustering,
                                  DeletionTime deletion,
-                                 OpOrder.Group opGroup)
+                                 WriteContext ctx)
     {
-        doDelete(indexKey, indexClustering, deletion, opGroup);
+        doDelete(indexKey, indexClustering, deletion, ctx);
         logger.trace("Removed index entry for stale value {}", indexKey);
     }
 
@@ -521,14 +521,14 @@ public abstract class CassandraIndex implements Index
                         Clustering clustering,
                         Cell cell,
                         LivenessInfo info,
-                        OpOrder.Group opGroup)
+                        WriteContext ctx)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
                                                                clustering,
                                                                cell));
         Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, 
clustering, cell), info);
         PartitionUpdate upd = partitionUpdate(valueKey, row);
-        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP);
         logger.trace("Inserted entry into index for value {}", valueKey);
     }
 
@@ -538,7 +538,7 @@ public abstract class CassandraIndex implements Index
     private void delete(ByteBuffer rowKey,
                         Clustering clustering,
                         Cell cell,
-                        OpOrder.Group opGroup,
+                        WriteContext ctx,
                         int nowInSec)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
@@ -547,7 +547,7 @@ public abstract class CassandraIndex implements Index
         doDelete(valueKey,
                  buildIndexClustering(rowKey, clustering, cell),
                  new DeletionTime(cell.timestamp(), nowInSec),
-                 opGroup);
+                 ctx);
     }
 
     /**
@@ -556,7 +556,7 @@ public abstract class CassandraIndex implements Index
     private void delete(ByteBuffer rowKey,
                         Clustering clustering,
                         DeletionTime deletion,
-                        OpOrder.Group opGroup)
+                        WriteContext ctx)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
                                                                clustering,
@@ -564,17 +564,17 @@ public abstract class CassandraIndex implements Index
         doDelete(valueKey,
                  buildIndexClustering(rowKey, clustering, null),
                  deletion,
-                 opGroup);
+                 ctx);
     }
 
     private void doDelete(DecoratedKey indexKey,
                           Clustering indexClustering,
                           DeletionTime deletion,
-                          OpOrder.Group opGroup)
+                          WriteContext ctx)
     {
         Row row = BTreeRow.emptyDeletedRow(indexClustering, 
Row.Deletion.regular(deletion));
         PartitionUpdate upd = partitionUpdate(indexKey, row);
-        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP);
         logger.trace("Removed index entry for value {}", indexKey);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
 
b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index f9ae705..9045b3b 100644
--- 
a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ 
b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -170,7 +170,7 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
                         
filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, 
executionController),
                                            indexKey.getKey(),
                                            entries,
-                                           
executionController.writeOpOrderGroup(),
+                                           
executionController.getWriteContext(),
                                            command.nowInSec());
 
                     if (dataIter.isEmpty())
@@ -198,20 +198,20 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
         };
     }
 
-    private void deleteAllEntries(final List<IndexEntry> entries, final 
OpOrder.Group writeOp, final int nowInSec)
+    private void deleteAllEntries(final List<IndexEntry> entries, final 
WriteContext ctx, final int nowInSec)
     {
         entries.forEach(entry ->
             index.deleteStaleEntry(entry.indexValue,
                                    entry.indexClustering,
                                    new DeletionTime(entry.timestamp, nowInSec),
-                                   writeOp));
+                                   ctx));
     }
 
     // We assume all rows in dataIter belong to the same partition.
     private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator 
dataIter,
                                                      final ByteBuffer 
indexValue,
                                                      final List<IndexEntry> 
entries,
-                                                     final OpOrder.Group 
writeOp,
+                                                     final WriteContext ctx,
                                                      final int nowInSec)
     {
         // collect stale index entries and delete them when we close this 
iterator
@@ -245,7 +245,7 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
                                                                          
dataIter.partitionLevelDeletion(),
                                                                          
dataIter.isReverseOrder());
             }
-            deleteAllEntries(staleEntries, writeOp, nowInSec);
+            deleteAllEntries(staleEntries, ctx, nowInSec);
         }
         else
         {
@@ -292,7 +292,7 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
                 @Override
                 public void onPartitionClose()
                 {
-                    deleteAllEntries(staleEntries, writeOp, nowInSec);
+                    deleteAllEntries(staleEntries, ctx, nowInSec);
                 }
             }
             iteratorToReturn = Transformation.apply(dataIter, new Transform());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java 
b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 16dcd20..8b3a3d2 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.internal.CassandraIndexSearcher;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class KeysSearcher extends CassandraIndexSearcher
 {
@@ -100,7 +99,7 @@ public class KeysSearcher extends CassandraIndexSearcher
                     UnfilteredRowIterator dataIter = 
filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
                                                                    hit,
                                                                    
indexKey.getKey(),
-                                                                   
executionController.writeOpOrderGroup(),
+                                                                   
executionController.getWriteContext(),
                                                                    
command.nowInSec());
 
                     if (dataIter != null)
@@ -142,7 +141,7 @@ public class KeysSearcher extends CassandraIndexSearcher
     private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
                                                 Row indexHit,
                                                 ByteBuffer indexedValue,
-                                                OpOrder.Group writeOp,
+                                                WriteContext ctx,
                                                 int nowInSec)
     {
         Row data = iterator.staticRow();
@@ -152,7 +151,7 @@ public class KeysSearcher extends CassandraIndexSearcher
             
index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
                     makeIndexClustering(iterator.partitionKey().getKey(), 
Clustering.EMPTY),
                     new 
DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
-                    writeOp);
+                    ctx);
             iterator.close();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index b055e82..50b35fd 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -242,7 +242,7 @@ public class SASIIndex implements Index, 
INotificationConsumer
     public void validate(PartitionUpdate update) throws InvalidRequestException
     {}
 
-    public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns 
columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type 
transactionType)
+    public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns 
columns, int nowInSec, WriteContext context, IndexTransaction.Type 
transactionType)
     {
         return new Indexer()
         {
@@ -258,7 +258,7 @@ public class SASIIndex implements Index, 
INotificationConsumer
             public void insertRow(Row row)
             {
                 if (isNewData())
-                    adjustMemtableSize(index.index(key, row), opGroup);
+                    adjustMemtableSize(index.index(key, row), 
CassandraWriteContext.fromContext(context).getGroup());
             }
 
             public void updateRow(Row oldRow, Row newRow)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java 
b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index d14b50d..3036b1a 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -1048,13 +1048,14 @@ public class CustomIndexTest extends CQLTester
         public Indexer indexerFor(final DecoratedKey key,
                                   RegularAndStaticColumns columns,
                                   int nowInSec,
-                                  OpOrder.Group opGroup,
+                                  WriteContext ctx,
                                   IndexTransaction.Type transactionType)
         {
+            CassandraWriteContext cassandraWriteContext = 
(CassandraWriteContext) ctx;
             if (readOrderingAtStart == null)
                 readOrderingAtStart = baseCfs.readOrdering.getCurrent();
 
-            writeGroups.add(opGroup);
+            writeGroups.add(cassandraWriteContext.getGroup());
 
             return new Indexer()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java 
b/test/unit/org/apache/cassandra/index/StubIndex.java
index 00c47d1..a351cce 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -100,7 +100,7 @@ public class StubIndex implements Index
     public Indexer indexerFor(final DecoratedKey key,
                               RegularAndStaticColumns columns,
                               int nowInSec,
-                              OpOrder.Group opGroup,
+                              WriteContext ctx,
                               IndexTransaction.Type transactionType)
     {
         return new Indexer()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java 
b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 028bbd0..f007c09 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -287,7 +287,7 @@ public class CustomCassandraIndex implements Index
     public Indexer indexerFor(final DecoratedKey key,
                               final RegularAndStaticColumns columns,
                               final int nowInSec,
-                              final OpOrder.Group opGroup,
+                              final WriteContext ctx,
                               final IndexTransaction.Type transactionType)
     {
         if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
@@ -377,7 +377,7 @@ public class CustomCassandraIndex implements Index
                        clustering,
                        cell,
                        LivenessInfo.withExpirationTime(cell.timestamp(), 
cell.ttl(), cell.localDeletionTime()),
-                       opGroup);
+                       ctx);
             }
 
             private void removeCells(Clustering clustering, Iterable<Cell> 
cells)
@@ -394,7 +394,7 @@ public class CustomCassandraIndex implements Index
                 if (cell == null || !cell.isLive(nowInSec))
                     return;
 
-                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+                delete(key.getKey(), clustering, cell, ctx, nowInSec);
             }
 
             private void indexPrimaryKey(final Clustering clustering,
@@ -402,10 +402,10 @@ public class CustomCassandraIndex implements Index
                                          final Row.Deletion deletion)
             {
                 if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
-                    insert(key.getKey(), clustering, null, liveness, opGroup);
+                    insert(key.getKey(), clustering, null, liveness, ctx);
 
                 if (!deletion.isLive())
-                    delete(key.getKey(), clustering, deletion.time(), opGroup);
+                    delete(key.getKey(), clustering, deletion.time(), ctx);
             }
 
             private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
@@ -435,14 +435,14 @@ public class CustomCassandraIndex implements Index
      * @param indexKey the partition key in the index table
      * @param indexClustering the clustering in the index table
      * @param deletion deletion timestamp etc
-     * @param opGroup the operation under which to perform the deletion
+     * @param ctx the context under which to perform the deletion
      */
     public void deleteStaleEntry(DecoratedKey indexKey,
                                  Clustering indexClustering,
                                  DeletionTime deletion,
-                                 OpOrder.Group opGroup)
+                                 WriteContext ctx)
     {
-        doDelete(indexKey, indexClustering, deletion, opGroup);
+        doDelete(indexKey, indexClustering, deletion, ctx);
         logger.debug("Removed index entry for stale value {}", indexKey);
     }
 
@@ -453,14 +453,14 @@ public class CustomCassandraIndex implements Index
                         Clustering clustering,
                         Cell cell,
                         LivenessInfo info,
-                        OpOrder.Group opGroup)
+                        WriteContext ctx)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
                                                                clustering,
                                                                cell));
         Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, 
clustering, cell), info);
         PartitionUpdate upd = partitionUpdate(valueKey, row);
-        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP);
         logger.debug("Inserted entry into index for value {}", valueKey);
     }
 
@@ -470,7 +470,7 @@ public class CustomCassandraIndex implements Index
     private void delete(ByteBuffer rowKey,
                         Clustering clustering,
                         Cell cell,
-                        OpOrder.Group opGroup,
+                        WriteContext ctx,
                         int nowInSec)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
@@ -479,7 +479,7 @@ public class CustomCassandraIndex implements Index
         doDelete(valueKey,
                  buildIndexClustering(rowKey, clustering, cell),
                  new DeletionTime(cell.timestamp(), nowInSec),
-                 opGroup);
+                 ctx);
     }
 
     /**
@@ -488,7 +488,7 @@ public class CustomCassandraIndex implements Index
     private void delete(ByteBuffer rowKey,
                         Clustering clustering,
                         DeletionTime deletion,
-                        OpOrder.Group opGroup)
+                        WriteContext ctx)
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
                                                                clustering,
@@ -496,17 +496,17 @@ public class CustomCassandraIndex implements Index
         doDelete(valueKey,
                  buildIndexClustering(rowKey, clustering, null),
                  deletion,
-                 opGroup);
+                 ctx);
     }
 
     private void doDelete(DecoratedKey indexKey,
                           Clustering indexClustering,
                           DeletionTime deletion,
-                          OpOrder.Group opGroup)
+                          WriteContext ctx)
     {
         Row row = BTreeRow.emptyDeletedRow(indexClustering, 
Row.Deletion.regular(deletion));
         PartitionUpdate upd = partitionUpdate(indexKey, row);
-        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP);
         logger.debug("Removed index entry for value {}", indexKey);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to