Repository: cassandra
Updated Branches:
  refs/heads/trunk 74290df23 -> 0b68b6dd7


Make OpOrder AutoCloseable

Patch by benedict, reviewed by marcuse for CASSANDRA-6901.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/269f8105
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/269f8105
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/269f8105

Branch: refs/heads/trunk
Commit: 269f81052e42d36f9a3bee684464543b7074b6b9
Parents: 53e2212
Author: belliottsmith <git...@sub.laerad.com>
Authored: Fri Mar 21 15:04:36 2014 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Fri Mar 21 15:05:39 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 27 ++++------
 .../db/commitlog/CommitLogSegment.java          | 13 ++---
 .../db/compaction/CompactionManager.java        | 54 ++++++++++++++------
 .../db/index/SecondaryIndexManager.java         | 34 ++++++++----
 .../db/index/composites/CompositesIndex.java    |  7 +--
 .../cassandra/db/index/keys/KeysSearcher.java   |  9 +---
 .../cassandra/utils/concurrent/OpOrder.java     | 21 +++-----
 .../cassandra/concurrent/LongOpOrderTest.java   | 24 ++++-----
 9 files changed, 99 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e2ed4d..2949b6a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * Proper compare function for CollectionType (CASSANDRA-6783)
  * Update native server to Netty 4 (CASSANDRA-6236)
  * Fix off-by-one error in stress (CASSANDRA-6883)
+ * Make OpOrder AutoCloseable (CASSANDRA-6901)
 Merged from 2.0:
  * Add uuid() function (CASSANDRA-6473)
  * Omit tombstones from schema digests (CASSANDRA-6862)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 436aca0..fabd433 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -19,16 +19,19 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +40,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
@@ -45,6 +49,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * It represents a Keyspace.
@@ -338,8 +343,7 @@ public class Keyspace
      */
     public void apply(Mutation mutation, boolean writeCommitLog, boolean 
updateIndexes)
     {
-        final OpOrder.Group opGroup = writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = writeOrder.start())
         {
             // write the mutation to the commitlog and memtables
             final ReplayPosition replayPosition;
@@ -370,10 +374,6 @@ public class Keyspace
                 cfs.apply(key, cf, updater, opGroup, replayPosition);
             }
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()
@@ -391,8 +391,7 @@ public class Keyspace
         if (logger.isDebugEnabled())
             logger.debug("Indexing row {} ", 
cfs.metadata.getKeyValidator().getString(key.key));
 
-        final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
         {
             Set<SecondaryIndex> indexes = 
cfs.indexManager.getIndexesByNames(idxNames);
 
@@ -409,10 +408,6 @@ public class Keyspace
                 cfs.indexManager.indexRow(key.key, cf2, opGroup);
             }
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     public List<Future<?>> flush()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index cabd886..9436a5a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -178,7 +178,7 @@ public class CommitLogSegment
             int position = allocate(size);
             if (position < 0)
             {
-                commandOrder.finishOne();
+                commandOrder.close();
                 return false;
             }
             alloc.buffer = (ByteBuffer) 
buffer.duplicate().position(position).limit(position + size);
@@ -190,7 +190,7 @@ public class CommitLogSegment
         }
         catch (Throwable t)
         {
-            commandOrder.finishOne();
+            commandOrder.close();
             throw t;
         }
     }
@@ -216,8 +216,7 @@ public class CommitLogSegment
         // this actually isn't strictly necessary, as currently all calls to 
discardUnusedTail occur within a block
         // already protected by this OpOrdering, but to prevent future 
potential mistakes, we duplicate the protection here
         // so that the contract between discardUnusedTail() and sync() is more 
explicit.
-        OpOrder.Group group = appendOrder.start();
-        try
+        try (OpOrder.Group group = appendOrder.start())
         {
             while (true)
             {
@@ -233,10 +232,6 @@ public class CommitLogSegment
                 }
             }
         }
-        finally
-        {
-            group.finishOne();
-        }
     }
 
     /**
@@ -581,7 +576,7 @@ public class CommitLogSegment
         // but must not be called more than once
         void markWritten()
         {
-            appendOp.finishOne();
+            appendOp.close();
         }
 
         void awaitDiskSync()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e28cfef..20fc747 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -20,19 +20,37 @@ package org.apache.cassandra.db.compaction;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.*;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,20 +60,31 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * A singleton which manages a private executor of ongoing compactions.
@@ -831,15 +860,10 @@ public class CompactionManager implements 
CompactionManagerMBean
                 if (indexedColumnsInRow != null && 
!indexedColumnsInRow.isEmpty())
                 {
                     // acquire memtable lock here because secondary index 
deletion may cause a race. See CASSANDRA-3712
-                    final OpOrder.Group opGroup = 
cfs.keyspace.writeOrder.start();
-                    try
+                    try (OpOrder.Group opGroup = 
cfs.keyspace.writeOrder.start())
                     {
                         cfs.indexManager.deleteFromIndexes(row.getKey(), 
indexedColumnsInRow, opGroup);
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index c628a74..a97007e 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,10 +18,20 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -29,7 +39,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ExtendedFilter;
@@ -37,6 +53,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * Manages all the indexes associated with a given CFS
@@ -646,15 +663,10 @@ public class SecondaryIndexManager
             {
                 if (index instanceof PerColumnSecondaryIndex)
                 {
-                    OpOrder.Group opGroup = 
baseCfs.keyspace.writeOrder.start();
-                    try
+                    try (OpOrder.Group opGroup = 
baseCfs.keyspace.writeOrder.start())
                     {
                         ((PerColumnSecondaryIndex) index).delete(key.key, 
cell, opGroup);
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java 
b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index ec1118b..52b76ea 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -132,8 +132,7 @@ public abstract class CompositesIndex extends 
AbstractSimplePerColumnSecondaryIn
     public void delete(IndexedEntry entry)
     {
         // start a mini-transaction for this delete, to ensure safe memtable 
updates
-        OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
         {
             int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
             ColumnFamily cfi = 
ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
@@ -142,10 +141,6 @@ public abstract class CompositesIndex extends 
AbstractSimplePerColumnSecondaryIn
             if (logger.isDebugEnabled())
                 logger.debug("removed index entry for cleaned-up value {}:{}", 
entry.indexValue, cfi);
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     protected AbstractType<?> getExpressionComparator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java 
b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index ce5fe30..af780d3 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -187,14 +187,9 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         {
                             // delete the index entry w/ its own timestamp
                             Cell dummyCell = new Cell(primaryColumn, 
indexKey.key, cell.timestamp());
-                            OpOrder.Group opGroup = 
baseCfs.keyspace.writeOrder.start();
-                            try
+                            try (OpOrder.Group opGroup = 
baseCfs.keyspace.writeOrder.start())
                             {
-                                
((PerColumnSecondaryIndex)index).delete(dk.key, dummyCell, opGroup);
-                            }
-                            finally
-                            {
-                                opGroup.finishOne();
+                                ((PerColumnSecondaryIndex) 
index).delete(dk.key, dummyCell, opGroup);
                             }
                             continue;
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java 
b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
index d42f996..bc43e10 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -64,18 +64,13 @@ import 
java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
         public void produce()
         {
-            Group opGroup = order.start();
-            try
+            try (Group opGroup = order.start())
             {
                 SharedState s = state;
                 while (s.barrier != null && !s.barrier.isAfter(opGroup))
                     s = s.getReplacement();
                 s.doProduceWork();
             }
-            finally
-            {
-                opGroup.finishOne();
-            }
         }
     }
  * </pre>
@@ -97,7 +92,7 @@ public class OpOrder
 
     /**
      * Start an operation against this OpOrder.
-     * Once the operation is completed Ordered.finishOne() MUST be called 
EXACTLY once for this operation.
+     * Once the operation is completed Ordered.close() MUST be called EXACTLY 
once for this operation.
      *
      * @return the Ordered instance that manages this OpOrder
      */
@@ -131,17 +126,17 @@ public class OpOrder
 
     /**
      * Represents a group of identically ordered operations, i.e. all 
operations started in the interval between
-     * two barrier issuances. For each register() call this is returned, 
finishOne() must be called exactly once.
+     * two barrier issuances. For each register() call this is returned, 
close() must be called exactly once.
      * It should be treated like taking a lock().
      */
-    public static final class Group implements Comparable<Group>
+    public static final class Group implements Comparable<Group>, AutoCloseable
     {
         /**
          * In general this class goes through the following stages:
-         * 1) LIVE:      many calls to register() and finishOne()
+         * 1) LIVE:      many calls to register() and close()
          * 2) FINISHING: a call to expire() (after a barrier issue), means 
calls to register() will now fail,
          *               and we are now 'in the past' (new operations will be 
started against a new Ordered)
-         * 3) FINISHED:  once the last finishOne() is called, this Ordered is 
done. We call unlink().
+         * 3) FINISHED:  once the last close() is called, this Ordered is 
done. We call unlink().
          * 4) ZOMBIE:    all our operations are finished, but some operations 
against an earlier Ordered are still
          *               running, or tidying up, so unlink() fails to remove us
          * 5) COMPLETE:  all operations started on or before us are FINISHED 
(and COMPLETE), so we are unlinked
@@ -176,7 +171,7 @@ public class OpOrder
         }
 
         // prevents any further operations starting against this Ordered 
instance
-        // if there are no running operations, calls unlink; otherwise, we let 
the last op to finishOne call it.
+        // if there are no running operations, calls unlink; otherwise, we let 
the last op to close call it.
         // this means issue() won't have to block for ops to finish.
         private void expire()
         {
@@ -212,7 +207,7 @@ public class OpOrder
          * To be called exactly once for each register() call this object is 
returned for, indicating the operation
          * is complete
          */
-        public void finishOne()
+        public void close()
         {
             while (true)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java 
b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
index ec00d48..d7105df 100644
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -21,14 +21,6 @@ package org.apache.cassandra.concurrent;
  */
 
 
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.*;
-import org.slf4j.*;
-
-import static org.junit.Assert.*;
-
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -37,6 +29,15 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
 // TODO: we don't currently test SAFE functionality at all!
 // TODO: should also test markBlocking and SyncOrdered
 public class LongOpOrderTest
@@ -202,8 +203,7 @@ public class LongOpOrderTest
                 while (true)
                 {
                     AtomicInteger c;
-                    OpOrder.Group opGroup = order.start();
-                    try
+                    try (OpOrder.Group opGroup = order.start())
                     {
                         if (null == (c = count.get(opGroup)))
                         {
@@ -215,10 +215,6 @@ public class LongOpOrderTest
                         while (!s.accept(opGroup))
                             s = s.replacement;
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
             }
         }

Reply via email to