Repository: cassandra
Updated Branches:
  refs/heads/trunk bf85616f6 -> edade5ac3


Fixes race during construction of commit log:
- Moves thread start outside of constructor for all commit log classes.
- Removes all references to CommitLog.instance from commitlog package.

patch by Branimir Lambov; reviewed by tjake for CASSANDRA-10049


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01a11fd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01a11fd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01a11fd2

Branch: refs/heads/trunk
Commit: 01a11fd2626d57bf0c8d0bce1e43060017592896
Parents: e1086bc
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Aug 12 11:59:41 2015 +0300
Committer: T Jake Luciani <j...@apache.org>
Committed: Fri Aug 28 09:43:34 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  |  2 +-
 .../cassandra/db/commitlog/CommitLog.java       | 24 ++++++++-----
 .../db/commitlog/CommitLogArchiver.java         | 38 +++++++++++++-------
 .../db/commitlog/CommitLogReplayer.java         | 14 ++++----
 .../db/commitlog/CommitLogSegmentManager.java   | 13 ++-----
 .../db/commitlog/CommitLogStressTest.java       |  9 ++---
 .../unit/org/apache/cassandra/SchemaLoader.java |  2 +-
 .../db/commitlog/CommitLogTestReplayer.java     | 10 +++---
 .../db/commitlog/CommitLogUpgradeTest.java      |  2 +-
 10 files changed, 65 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cffd18..c215a50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.1
+ * Fix race during construction of commit log (CASSANDRA-10049)
  * Fix LeveledCompactionStrategyTest (CASSANDRA-9757)
  * Fix broken UnbufferedDataOutputStreamPlus.writeUTF (CASSANDRA-10203)
  * (cqlsh) add CLEAR command (CASSANDRA-10086)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index fa981a3..702ace5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -186,7 +186,7 @@ public abstract class AbstractCommitLogService
     /**
      * FOR TESTING ONLY
      */
-    public void startUnsafe()
+    public void restartUnsafe()
     {
         while (haveWork.availablePermits() < 1)
             haveWork.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index f23ebae..63005d7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -76,7 +76,7 @@ public class CommitLog implements CommitLogMBean
 
     static private CommitLog construct()
     {
-        CommitLog log = new 
CommitLog(DatabaseDescriptor.getCommitLogLocation(), new CommitLogArchiver());
+        CommitLog log = new 
CommitLog(DatabaseDescriptor.getCommitLogLocation(), 
CommitLogArchiver.construct());
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -87,7 +87,7 @@ public class CommitLog implements CommitLogMBean
         {
             throw new RuntimeException(e);
         }
-        return log;
+        return log.start();
     }
 
     @VisibleForTesting
@@ -107,12 +107,18 @@ public class CommitLog implements CommitLogMBean
                 : new PeriodicCommitLogService(this);
 
         allocator = new CommitLogSegmentManager(this);
-        executor.start();
 
         // register metrics
         metrics.attach(executor, allocator);
     }
 
+    CommitLog start()
+    {
+        executor.start();
+        allocator.start();
+        return this;
+    }
+
     /**
      * Perform recovery on commit logs located in the directory specified by 
the config file.
      *
@@ -179,7 +185,7 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover(File... clogs) throws IOException
     {
-        CommitLogReplayer recovery = CommitLogReplayer.create();
+        CommitLogReplayer recovery = CommitLogReplayer.construct(this);
         recovery.recover(clogs);
         return recovery.blockForWrites();
     }
@@ -189,7 +195,7 @@ public class CommitLog implements CommitLogMBean
      */
     public void recover(String path) throws IOException
     {
-        CommitLogReplayer recovery = CommitLogReplayer.create();
+        CommitLogReplayer recovery = CommitLogReplayer.construct(this);
         recovery.recover(new File(path), false);
         recovery.blockForWrites();
     }
@@ -411,7 +417,7 @@ public class CommitLog implements CommitLogMBean
     public int resetUnsafe(boolean deleteSegments) throws IOException
     {
         stopUnsafe(deleteSegments);
-        return startUnsafe();
+        return restartUnsafe();
     }
 
     /**
@@ -434,10 +440,10 @@ public class CommitLog implements CommitLogMBean
     /**
      * FOR TESTING PURPOSES.  See CommitLogAllocator
      */
-    public int startUnsafe() throws IOException
+    public int restartUnsafe() throws IOException
     {
-        allocator.startUnsafe();
-        executor.startUnsafe();
+        allocator.start();
+        executor.restartUnsafe();
         return recover();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 02072de..4c615e0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -53,33 +53,45 @@ public class CommitLogArchiver
     }
 
     public final Map<String, Future<?>> archivePending = new 
ConcurrentHashMap<String, Future<?>>();
-    private final ExecutorService executor = new 
JMXEnabledThreadPoolExecutor("CommitLogArchiver");
+    private final ExecutorService executor;
     final String archiveCommand;
     final String restoreCommand;
     final String restoreDirectories;
     public long restorePointInTime;
     public final TimeUnit precision;
 
-    public CommitLogArchiver()
+    public CommitLogArchiver(String archiveCommand, String restoreCommand, 
String restoreDirectories,
+            long restorePointInTime, TimeUnit precision)
+    {
+        this.archiveCommand = archiveCommand;
+        this.restoreCommand = restoreCommand;
+        this.restoreDirectories = restoreDirectories;
+        this.restorePointInTime = restorePointInTime;
+        this.precision = precision;
+        executor = !Strings.isNullOrEmpty(archiveCommand) ? new 
JMXEnabledThreadPoolExecutor("CommitLogArchiver") : null;
+    }
+
+    public static CommitLogArchiver disabled()
+    {
+        return new CommitLogArchiver(null, null, null, Long.MAX_VALUE, 
TimeUnit.MICROSECONDS);
+    }
+
+    public static CommitLogArchiver construct()
     {
         Properties commitlog_commands = new Properties();
-        try (InputStream stream = 
getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties"))
+        try (InputStream stream = 
CommitLogArchiver.class.getClassLoader().getResourceAsStream("commitlog_archiving.properties"))
         {
             if (stream == null)
             {
                 logger.debug("No commitlog_archiving properties found; archive 
+ pitr will be disabled");
-                archiveCommand = null;
-                restoreCommand = null;
-                restoreDirectories = null;
-                restorePointInTime = Long.MAX_VALUE;
-                precision = TimeUnit.MICROSECONDS;
+                return disabled();
             }
             else
             {
                 commitlog_commands.load(stream);
-                archiveCommand = 
commitlog_commands.getProperty("archive_command");
-                restoreCommand = 
commitlog_commands.getProperty("restore_command");
-                restoreDirectories = 
commitlog_commands.getProperty("restore_directories");
+                String archiveCommand = 
commitlog_commands.getProperty("archive_command");
+                String restoreCommand = 
commitlog_commands.getProperty("restore_command");
+                String restoreDirectories = 
commitlog_commands.getProperty("restore_directories");
                 if (restoreDirectories != null && 
!restoreDirectories.isEmpty())
                 {
                     for (String dir : restoreDirectories.split(DELIMITER))
@@ -95,7 +107,8 @@ public class CommitLogArchiver
                     }
                 }
                 String targetTime = 
commitlog_commands.getProperty("restore_point_in_time");
-                precision = 
TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MICROSECONDS"));
+                TimeUnit precision = 
TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MICROSECONDS"));
+                long restorePointInTime;
                 try
                 {
                     restorePointInTime = Strings.isNullOrEmpty(targetTime) ? 
Long.MAX_VALUE : format.parse(targetTime).getTime();
@@ -104,6 +117,7 @@ public class CommitLogArchiver
                 {
                     throw new RuntimeException("Unable to parse restore target 
time", e);
                 }
+                return new CommitLogArchiver(archiveCommand, restoreCommand, 
restoreDirectories, restorePointInTime, precision);
             }
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af515d2..389b111 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -77,8 +77,9 @@ public class CommitLogReplayer
     private byte[] uncompressedBuffer;
 
     private final ReplayFilter replayFilter;
+    private final CommitLogArchiver archiver;
 
-    CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> 
cfPositions, ReplayFilter replayFilter)
+    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, 
Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayList<Future<?>>();
@@ -91,9 +92,10 @@ public class CommitLogReplayer
         this.cfPositions = cfPositions;
         this.globalPosition = globalPosition;
         this.replayFilter = replayFilter;
+        this.archiver = commitLog.archiver;
     }
 
-    public static CommitLogReplayer create()
+    public static CommitLogReplayer construct(CommitLog commitLog)
     {
         // compute per-CF and global replay positions
         Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, 
ReplayPosition>();
@@ -113,7 +115,7 @@ public class CommitLogReplayer
                 // Point in time restore is taken to mean that the tables need 
to be recovered even if they were
                 // deleted at a later point in time. Any truncation record 
after that point must thus be cleared prior
                 // to recovery (CASSANDRA-9195).
-                long restoreTime = 
CommitLog.instance.archiver.restorePointInTime;
+                long restoreTime = commitLog.archiver.restorePointInTime;
                 long truncatedTime = 
SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
                 if (truncatedTime > restoreTime)
                 {
@@ -135,7 +137,7 @@ public class CommitLogReplayer
         }
         ReplayPosition globalPosition = 
replayPositionOrdering.min(cfPositions.values());
         logger.debug("Global replay position is {} from columnfamilies {}", 
globalPosition, FBUtilities.toString(cfPositions));
-        return new CommitLogReplayer(globalPosition, cfPositions, 
replayFilter);
+        return new CommitLogReplayer(commitLog, globalPosition, cfPositions, 
replayFilter);
     }
 
     public void recover(File[] clogs) throws IOException
@@ -604,11 +606,11 @@ public class CommitLogReplayer
 
     protected boolean pointInTimeExceeded(Mutation fm)
     {
-        long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
+        long restoreTarget = archiver.restorePointInTime;
 
         for (ColumnFamily families : fm.getColumnFamilies())
         {
-            if 
(CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > 
restoreTarget)
+            if (archiver.precision.toMillis(families.maxTimestamp()) > 
restoreTarget)
                 return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 3f00e97..5918474 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -95,13 +95,12 @@ public class CommitLogSegmentManager
     private volatile boolean run = true;
     private final CommitLog commitLog;
 
-    public CommitLogSegmentManager(final CommitLog commitLog)
+    CommitLogSegmentManager(final CommitLog commitLog)
     {
         this.commitLog = commitLog;
-        start();
     }
 
-    private void start()
+    void start()
     {
         // The run loop for the manager thread
         Runnable runnable = new WrappedRunnable()
@@ -526,14 +525,6 @@ public class CommitLogSegmentManager
     }
 
     /**
-     * Starts CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
-     */
-    public void startUnsafe()
-    {
-        start();
-    }
-
-    /**
      * Initiates the shutdown process for the management thread.
      */
     public void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java 
b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 5897dec..1b4edee 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -111,6 +111,7 @@ public class CommitLogStressTest
             initialize();
 
             CommitLogStressTest tester = new CommitLogStressTest();
+            tester.cleanDir();
             tester.testFixedSize();
         }
         catch (Throwable e)
@@ -206,7 +207,7 @@ public class CommitLogStressTest
             for (CommitLogSync sync : CommitLogSync.values())
             {
                 DatabaseDescriptor.setCommitLogSync(sync);
-                CommitLog commitLog = new CommitLog(location, 
CommitLog.instance.archiver);
+                CommitLog commitLog = new CommitLog(location, 
CommitLogArchiver.disabled()).start();
                 testLog(commitLog);
             }
         }
@@ -271,7 +272,7 @@ public class CommitLogStressTest
 
         System.out.print("Stopped. Replaying... ");
         System.out.flush();
-        Replayer repl = new Replayer();
+        Replayer repl = new Replayer(commitLog);
         File[] files = new File(location).listFiles();
         repl.recover(files);
 
@@ -442,9 +443,9 @@ public class CommitLogStressTest
 
     class Replayer extends CommitLogReplayer
     {
-        Replayer()
+        Replayer(CommitLog log)
         {
-            super(discardedPos, null, ReplayFilter.create());
+            super(log, discardedPos, null, ReplayFilter.create());
         }
 
         int hash = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index 46f4a9a..a7cf7b4 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -463,7 +463,7 @@ public class SchemaLoader
         mkdirs();
         cleanup();
         mkdirs();
-        CommitLog.instance.startUnsafe();
+        CommitLog.instance.restartUnsafe();
     }
 
     public static void cleanup()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 4ad49ec..c377a21 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -40,21 +40,21 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     {
         CommitLog.instance.sync(true);
 
-        CommitLogTestReplayer replayer = new CommitLogTestReplayer(processor);
+        CommitLogTestReplayer replayer = new 
CommitLogTestReplayer(CommitLog.instance, processor);
         File commitLogDir = new 
File(DatabaseDescriptor.getCommitLogLocation());
         replayer.recover(commitLogDir.listFiles());
     }
 
     final private Predicate<Mutation> processor;
 
-    public CommitLogTestReplayer(Predicate<Mutation> processor)
+    public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
     {
-        this(ReplayPosition.NONE, processor);
+        this(log, ReplayPosition.NONE, processor);
     }
 
-    public CommitLogTestReplayer(ReplayPosition discardedPos, 
Predicate<Mutation> processor)
+    public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, 
Predicate<Mutation> processor)
     {
-        super(discardedPos, null, ReplayFilter.create());
+        super(log, discardedPos, null, ReplayFilter.create());
         this.processor = processor;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a11fd2/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index af85d5d..9de2628 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -157,7 +157,7 @@ public class CommitLogUpgradeTest
         }
 
         Hasher hasher = new Hasher();
-        CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher);
+        CommitLogTestReplayer replayer = new 
CommitLogTestReplayer(CommitLog.instance, hasher);
         File[] files = new File(location).listFiles(new FilenameFilter()
         {
             @Override

Reply via email to