Author: jbellis
Date: Wed Jan 26 15:00:23 2011
New Revision: 1063751
URL: http://svn.apache.org/viewvc?rev=1063751&view=rev
Log:
add JVM shutdownhook to sync commitlog
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1919
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 26 15:00:23 2011
@@ -1,6 +1,7 @@
0.7.2-dev
- * fix potential overflow in nodetool cfstats
- * offline nodes (CASSANDRA-1951)
+ * fix potential overflow in nodetool cfstats (CASSANDRA-2057)
+ * add JVM shutdownhook to sync commitlog (CASSANDRA-1919)
+ * allow nodes to be up without being part of normal traffic (CASSANDRA-1951)
0.7.1
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -61,19 +61,12 @@ public abstract class AbstractCommitLogE
return completedTaskCount;
}
- // cassandra is crash-only so there's no need to implement the shutdown
methods
-
- public boolean isShutdown()
- {
- return false;
- }
-
public boolean isTerminated()
{
- return false;
+ throw new UnsupportedOperationException();
}
- public void shutdown()
+ public boolean isShutdown()
{
throw new UnsupportedOperationException();
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -31,6 +31,8 @@ import org.apache.cassandra.utils.Wrappe
class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
implements ICommitLogExecutorService, BatchCommitLogExecutorServiceMBean
{
private final BlockingQueue<CheaterFutureTask> queue;
+ private final Thread appendingThread;
+ private volatile boolean run = true;
public BatchCommitLogExecutorService()
{
@@ -44,14 +46,15 @@ class BatchCommitLogExecutorService exte
{
public void runMayThrow() throws Exception
{
- while (true)
+ while (run)
{
- processWithSyncBatch();
- completedTaskCount++;
+ if (processWithSyncBatch())
+ completedTaskCount++;
}
}
};
- new Thread(runnable, "COMMIT-LOG-WRITER").start();
+ appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
+ appendingThread.start();
registerMBean(this);
}
@@ -63,13 +66,15 @@ class BatchCommitLogExecutorService exte
private final ArrayList<CheaterFutureTask> incompleteTasks = new
ArrayList<CheaterFutureTask>();
private final ArrayList taskValues = new ArrayList(); // TODO not sure how
to generify this
- private void processWithSyncBatch() throws Exception
+ private boolean processWithSyncBatch() throws Exception
{
- CheaterFutureTask firstTask = queue.take();
+ CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (firstTask == null)
+ return false;
if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
{
firstTask.run();
- return;
+ return true;
}
// attempt to do a bunch of LogRecordAdder ops before syncing
@@ -105,6 +110,7 @@ class BatchCommitLogExecutorService exte
{
incompleteTasks.get(i).set(taskValues.get(i));
}
+ return true;
}
@@ -148,6 +154,25 @@ class BatchCommitLogExecutorService exte
}
}
+ public void shutdown()
+ {
+ new Thread(new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException, IOException
+ {
+ while (!queue.isEmpty())
+ Thread.sleep(100);
+ run = false;
+ appendingThread.join();
+ }
+ }, "Commitlog Shutdown").start();
+ }
+
+ public void awaitTermination() throws InterruptedException
+ {
+ appendingThread.join();
+ }
+
private static class CheaterFutureTask<V> extends FutureTask<V>
{
private final Callable rawCallable;
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Wed Jan 26 15:00:23 2011
@@ -114,45 +114,9 @@ public class CommitLog
// All we need to do is create a new one.
segments.add(new CommitLogSegment());
- if (DatabaseDescriptor.getCommitLogSync() ==
Config.CommitLogSync.batch)
- {
- executor = new BatchCommitLogExecutorService();
- }
- else
- {
- executor = new PeriodicCommitLogExecutorService();
- final Callable syncer = new Callable()
- {
- public Object call() throws Exception
- {
- sync();
- return null;
- }
- };
-
- new Thread(new Runnable()
- {
- public void run()
- {
- while (true)
- {
- try
- {
- executor.submit(syncer).get();
-
Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- }, "PERIODIC-COMMIT-LOG-SYNCER").start();
- }
+ executor = DatabaseDescriptor.getCommitLogSync() ==
Config.CommitLogSync.batch
+ ? new BatchCommitLogExecutorService()
+ : new PeriodicCommitLogExecutorService(this);
}
public void resetUnsafe()
@@ -527,4 +491,10 @@ public class CommitLog
return null;
}
}
+
+ public void shutdownBlocking() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination();
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -38,4 +38,9 @@ public interface ICommitLogExecutorServi
*/
public void add(CommitLog.LogRecordAdder adder);
+ /** shuts down the CommitLogExecutor in an orderly fashion */
+ public void shutdown();
+
+ /** Blocks until shutdown is complete. */
+ public void awaitTermination() throws InterruptedException;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -21,35 +21,71 @@ package org.apache.cassandra.db.commitlo
*/
+import java.io.IOException;
import java.util.concurrent.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.WrappedRunnable;
class PeriodicCommitLogExecutorService implements ICommitLogExecutorService,
PeriodicCommitLogExecutorServiceMBean
{
private final BlockingQueue<Runnable> queue;
protected volatile long completedTaskCount = 0;
+ private final Thread appendingThread;
+ private volatile boolean run = true;
- public PeriodicCommitLogExecutorService()
+ public PeriodicCommitLogExecutorService(final CommitLog commitLog)
{
- this(1024 * Runtime.getRuntime().availableProcessors());
- }
-
- public PeriodicCommitLogExecutorService(int queueSize)
- {
- queue = new LinkedBlockingQueue<Runnable>(queueSize);
+ queue = new LinkedBlockingQueue<Runnable>(1024 *
Runtime.getRuntime().availableProcessors());
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
{
- while (true)
+ while (run)
{
- queue.take().run();
+ Runnable r = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (r == null)
+ continue;
+ r.run();
completedTaskCount++;
}
+ commitLog.sync();
+ }
+ };
+ appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
+ appendingThread.start();
+
+ final Callable syncer = new Callable()
+ {
+ public Object call() throws Exception
+ {
+ commitLog.sync();
+ return null;
}
};
- new Thread(runnable, "COMMIT-LOG-WRITER").start();
+
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ while (run)
+ {
+ try
+ {
+ submit(syncer).get();
+
Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, "PERIODIC-COMMIT-LOG-SYNCER").start();
AbstractCommitLogExecutorService.registerMBean(this);
}
@@ -80,6 +116,25 @@ class PeriodicCommitLogExecutorService i
return ft;
}
+ public void shutdown()
+ {
+ new Thread(new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException, IOException
+ {
+ while (!queue.isEmpty())
+ Thread.sleep(100);
+ run = false;
+ appendingThread.join();
+ }
+ }, "Commitlog Shutdown").start();
+ }
+
+ public void awaitTermination() throws InterruptedException
+ {
+ appendingThread.join();
+ }
+
public long getPendingTasks()
{
return queue.size();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Wed Jan 26 15:00:23 2011
@@ -33,6 +33,8 @@ import javax.management.ObjectName;
import com.google.common.base.Charsets;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.locator.*;
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
@@ -378,6 +380,22 @@ public class StorageService implements I
}
}
+ // daemon threads, like our executors', continue to run while shutdown
hooks are invoked
+ Thread drainOnShutdown = new Thread(new WrappedRunnable()
+ {
+ public void runMayThrow() throws ExecutionException,
InterruptedException, IOException
+ {
+ ThreadPoolExecutor mutationStage =
StageManager.getStage(Stage.MUTATION);
+ if (!mutationStage.isShutdown())
+ {
+ mutationStage.shutdown();
+ mutationStage.awaitTermination(1, TimeUnit.SECONDS);
+ CommitLog.instance.shutdownBlocking();
+ }
+ }
+ });
+ Runtime.getRuntime().addShutdownHook(drainOnShutdown);
+
if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring",
"true")))
{
joinTokenRing();
@@ -1899,6 +1917,8 @@ public class StorageService implements I
ColumnFamilyStore.postFlushExecutor.shutdown();
ColumnFamilyStore.postFlushExecutor.awaitTermination(60,
TimeUnit.SECONDS);
+ CommitLog.instance.shutdownBlocking();
+
// want to make sure that any segments deleted as a result of flushing
are gone.
DeletionService.waitFor();