Merge branch 'cassandra-2.2' into cassandra-3.0
Conflicts:
test/unit/org/apache/cassandra/db/CommitLogTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2ad7d56
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2ad7d56
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2ad7d56
Branch: refs/heads/cassandra-3.0
Commit: e2ad7d5695d4044a021f14933762f105390c9beb
Parents: 7ea2ce1 0871533
Author: Benedict Elliott Smith <[email protected]>
Authored: Sun Aug 9 09:41:38 2015 +0200
Committer: Benedict Elliott Smith <[email protected]>
Committed: Sun Aug 9 09:41:38 2015 +0200
----------------------------------------------------------------------
.../cassandra/service/CassandraDaemon.java | 8 +
.../cassandra/utils/JVMStabilityInspector.java | 9 +-
.../org/apache/cassandra/db/CommitLogTest.java | 161 ++++++++++++++++++-
.../apache/cassandra/utils/KillerForTests.java | 11 +-
4 files changed, 184 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2ad7d56/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index fddf593,2020201..3b123c3
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -40,8 -40,10 +40,10 @@@ import com.addthis.metrics3.reporter.co
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistryListener;
import com.codahale.metrics.SharedMetricRegistries;
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.metrics.DefaultNameFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2ad7d56/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2ad7d56/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java
index 21bdd9b,536f0cb..512a3d2
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@@ -47,20 -42,31 +48,29 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.CommitLogSegment;
+ import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+ import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.ByteBufferDataInput;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.vint.VIntCoding;
+ import org.apache.cassandra.service.CassandraDaemon;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.KillerForTests;
public class CommitLogTest
{
@@@ -307,40 -319,179 +317,187 @@@
}
@Test
- public void testTruncateWithoutSnapshot() throws IOException
+ public void testCommitFailurePolicy_stop() throws ConfigurationException
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit
log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ // Need storage service active so stop policy can shutdown gossip
+ StorageService.instance.initServer();
+ Assert.assertTrue(Gossiper.instance.isEnabled());
+
+ Config.CommitFailurePolicy oldPolicy =
DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ CommitLog.handleCommitError("Test stop error", new Throwable());
+ Assert.assertFalse(Gossiper.instance.isEnabled());
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ }
+ }
+
+ @Test
+ public void testCommitFailurePolicy_die()
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit
log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller =
JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy =
DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+ CommitLog.handleCommitError("Testing die policy", new
Throwable());
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertFalse(killerForTests.wasKilledQuietly()); //only
killed quietly on startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+
+ @Test
+ public void testCommitFailurePolicy_mustDieIfNotStartedUp()
+ {
+ //startup was not completed successfuly (since method completeSetup()
was not called)
+ CassandraDaemon daemon = new CassandraDaemon();
+ StorageService.instance.registerDaemon(daemon);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller =
JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy =
DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+ //even though policy is ignore, JVM must die because Daemon has
not finished initializing
+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+ CommitLog.handleCommitError("Testing die policy", new
Throwable());
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed
quietly due to startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+
+ @Test
+ public void testCommitLogFailureBeforeInitialization_mustKillJVM() throws
Exception
+ {
+ //startup was not completed successfuly (since method completeSetup()
was not called)
+ CassandraDaemon daemon = new CassandraDaemon();
+ StorageService.instance.registerDaemon(daemon);
+
+ //let's make the commit log directory non-writable
+ File commitLogDir = new
File(DatabaseDescriptor.getCommitLogLocation());
+ commitLogDir.setWritable(false);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller =
JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy =
DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+
+ //now let's create a commit log segment manager and wait for it
to fail
+ new CommitLogSegmentManager(CommitLog.instance);
+
+ //busy wait since commitlogsegmentmanager spawns another thread
+ int retries = 0;
+ while (!killerForTests.wasKilled() && retries++ < 5)
+ Thread.sleep(10);
+
+ //since failure was before CassandraDaemon startup, the JVM must
be killed
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed
quietly due to startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ commitLogDir.setWritable(true);
+ }
+ }
+
+ @Test
+ public void
testCommitLogFailureAfterInitialization_mustRespectFailurePolicy() throws
Exception
+ {
+ //startup was not completed successfuly (since method completeSetup()
was not called)
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit
log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ //let's make the commit log directory non-writable
+ File commitLogDir = new
File(DatabaseDescriptor.getCommitLogLocation());
+ commitLogDir.setWritable(false);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller =
JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy =
DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+
+ //now let's create a commit log segment manager and wait for it
to fail
+ new CommitLogSegmentManager(CommitLog.instance);
+
+ //wait commit log segment manager thread to execute
+ Thread.sleep(50);
+
+ //error policy is set to IGNORE, so JVM must not be killed if
error ocurs after startup
+ Assert.assertFalse(killerForTests.wasKilled());
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ commitLogDir.setWritable(true);
+ }
+ }
+
+ @Test
+ public void testTruncateWithoutSnapshot() throws ExecutionException,
InterruptedException, IOException
{
- CommitLog.instance.resetUnsafe(true);
- boolean prev = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- ColumnFamilyStore cfs1 =
Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
- ColumnFamilyStore cfs2 =
Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
-
- final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
- rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100),
0);
- rm1.apply();
- cfs1.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prev);
- final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
- rm2.add("Standard2", Util.cellname("c1"),
ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
-
- for (int i = 0 ; i < 5 ; i++)
- CommitLog.instance.add(rm2);
-
- Assert.assertEquals(2, CommitLog.instance.activeSegments());
- ReplayPosition position = CommitLog.instance.getContext();
- for (Keyspace ks : Keyspace.system())
- for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
-
CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId,
position);
- Assert.assertEquals(1, CommitLog.instance.activeSegments());
+ boolean originalState = DatabaseDescriptor.isAutoSnapshot();
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ boolean prev = DatabaseDescriptor.isAutoSnapshot();
+ DatabaseDescriptor.setAutoSnapshot(false);
+ ColumnFamilyStore cfs1 =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ new RowUpdateBuilder(cfs1.metadata, 0,
"k").clustering("bytes").add("val",
ByteBuffer.allocate(100)).build().applyUnsafe();
+ cfs1.truncateBlocking();
+ DatabaseDescriptor.setAutoSnapshot(prev);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val",
ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
+ .build();
+
+ for (int i = 0 ; i < 5 ; i++)
+ CommitLog.instance.add(m2);
+
+ assertEquals(2, CommitLog.instance.activeSegments());
+ ReplayPosition position = CommitLog.instance.getContext();
+ for (Keyspace ks : Keyspace.system())
+ for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+
CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId,
position);
+ assertEquals(1, CommitLog.instance.activeSegments());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
}
@Test