This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new be574fc34b Fix default file system error handler for
disk_failure_policy die
be574fc34b is described below
commit be574fc34ba9834929f1618ab63dd74446cd2683
Author: Brandon Williams <[email protected]>
AuthorDate: Thu Mar 9 09:53:58 2023 -0600
Fix default file system error handler for disk_failure_policy die
Patch by Runtian Liu; reviewed by brandonwilliams and smiklosovic for
CASSANDRA-18294
---
CHANGES.txt | 1 +
.../cassandra/service/DefaultFSErrorHandler.java | 2 +
...ava => JVMStabilityInspectorThrowableTest.java} | 72 ++++++++---
.../service/DefaultFSErrorHandlerTest.java | 121 ++++++++++++++++++
.../cassandra/service/DiskFailurePolicyTest.java | 135 +++++++++++++++++++++
.../org/apache/cassandra/utils/KillerForTests.java | 5 +
6 files changed, 321 insertions(+), 15 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7994824ad3..aa1e60427a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.29
+ * Fix default file system error handler for disk_failure_policy die
(CASSANDRA-18294)
* Introduce check for names of test classes (CASSANDRA-17964)
* Suppress CVE-2022-41915 (CASSANDRA-18147)
* Suppress CVE-2021-1471, CVE-2021-3064, CVE-2021-4235 (CASSANDRA-18149)
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index 1c81f65e44..00029e3f40 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -44,6 +44,7 @@ public class DefaultFSErrorHandler implements FSErrorHandler
switch (DatabaseDescriptor.getDiskFailurePolicy())
{
+ case die:
case stop_paranoid:
// exception not logged here on purpose as it is already logged
logger.error("Stopping transports as disk_failure_policy is "
+ DatabaseDescriptor.getDiskFailurePolicy());
@@ -60,6 +61,7 @@ public class DefaultFSErrorHandler implements FSErrorHandler
switch (DatabaseDescriptor.getDiskFailurePolicy())
{
+ case die:
case stop_paranoid:
case stop:
// exception not logged here on purpose as it is already logged
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
similarity index 70%
rename from
test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
rename to
test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
index 98ca496ffc..d7aeccaf6f 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
@@ -19,11 +19,15 @@
package org.apache.cassandra.distributed.test;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.apache.cassandra.config.Config.DiskFailurePolicy;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,6 +43,8 @@ import
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableCallab
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -51,21 +57,46 @@ import static
org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-public class JVMStabilityInspectorCorruptSSTableExceptionTest extends
TestBaseImpl
+@RunWith(Parameterized.class)
+public class JVMStabilityInspectorThrowableTest extends TestBaseImpl
{
- @Test
- public void
testAbstractLocalAwareExecutorServiceOnIgnoredDiskFailurePolicy() throws
Exception
+ private DiskFailurePolicy testPolicy;
+ private boolean testCorrupted;
+ private boolean expectNativeTransportRunning;;
+ private boolean expectGossiperEnabled;
+
+ public JVMStabilityInspectorThrowableTest(DiskFailurePolicy policy,
boolean testCorrupted,
+ boolean
expectNativeTransportRunning, boolean expectGossiperEnabled)
+ {
+ this.testPolicy = policy;
+ this.testCorrupted = testCorrupted;
+ this.expectNativeTransportRunning = expectNativeTransportRunning;
+ this.expectGossiperEnabled = expectGossiperEnabled;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateData()
{
- test(DiskFailurePolicy.ignore, true, true);
+ return Arrays.asList(new Object[][]{
+ { DiskFailurePolicy.ignore, true, true, true},
+ { DiskFailurePolicy.stop, true, true, true},
+ { DiskFailurePolicy.stop_paranoid, true, false,
false},
+ { DiskFailurePolicy.best_effort, true, true,
true},
+ { DiskFailurePolicy.ignore, false, true, true},
+ { DiskFailurePolicy.stop, false, false, false},
+ { DiskFailurePolicy.stop_paranoid, false, false,
false},
+ { DiskFailurePolicy.best_effort, false, true,
true}
+ }
+ );
}
@Test
- public void
testAbstractLocalAwareExecutorServiceOnStopParanoidDiskFailurePolicy() throws
Exception
+ public void testAbstractLocalAwareExecutorServiceOnPolicies() throws
Exception
{
- test(DiskFailurePolicy.stop_paranoid, false, false);
+ test(testPolicy, testCorrupted, expectNativeTransportRunning,
expectGossiperEnabled);
}
- private static void test(DiskFailurePolicy policy, boolean
expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception
+ private static void test(DiskFailurePolicy policy, boolean
shouldTestCorrupted, boolean expectNativeTransportRunning, boolean
expectGossiperEnabled) throws Exception
{
String table = policy.name();
try (final Cluster cluster = init(getCluster(policy).start()))
@@ -84,16 +115,16 @@ public class
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + table + "
(id bigint PRIMARY KEY)");
node.executeInternal("INSERT INTO " + KEYSPACE + "." + table + "
(id) VALUES (?)", 0L);
- corruptTable(node, KEYSPACE, table);
+ throwThrowable(node, KEYSPACE, table, shouldTestCorrupted);
try
{
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE +
'.' + table + " WHERE id=?", ConsistencyLevel.ONE, 0L);
- Assert.fail("Select should fail as we corrupted SSTable on
purpose.");
+ Assert.fail("Select should fail as we expect corrupted sstable
or FS error.");
}
catch (final Exception ex)
{
- // we expect that above query fails as we corrupted an sstable
+ // we expect that above query fails as we corrupted an sstable
or throw FS error when read
}
waitForStop(!expectGossiperEnabled, node, new
SerializableCallable<Boolean>()
@@ -154,7 +185,7 @@ public class
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
}
}
- private static void corruptTable(IInvokableInstance node, String keyspace,
String table)
+ private static void throwThrowable(IInvokableInstance node, String
keyspace, String table, boolean shouldTestCorrupted)
{
node.runOnInstance(() -> {
ColumnFamilyStore cf =
Keyspace.open(keyspace).getColumnFamilyStore(table);
@@ -163,7 +194,7 @@ public class
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
Set<SSTableReader> remove = cf.getLiveSSTables();
Set<SSTableReader> replace = new HashSet<>();
for (SSTableReader r : remove)
- replace.add(new CorruptedSSTableReader(r));
+ replace.add(new CorruptedSSTableReader(r,
shouldTestCorrupted));
cf.getTracker().removeUnsafe(remove);
cf.addSSTables(replace);
@@ -180,26 +211,37 @@ public class
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
private static final class CorruptedSSTableReader extends
ForwardingSSTableReader
{
- public CorruptedSSTableReader(SSTableReader delegate)
+ private boolean shouldThrowCorrupted;
+ public CorruptedSSTableReader(SSTableReader delegate, boolean
shouldThrowCorrupted)
{
super(delegate);
+ this.shouldThrowCorrupted = shouldThrowCorrupted;
}
@Override
public SliceableUnfilteredRowIterator iterator(DecoratedKey key,
ColumnFilter selectedColumns, boolean reversed, boolean isForThrift,
SSTableReadsListener listener)
{
- throw throwCorrupted();
+ if (shouldThrowCorrupted)
+ throw throwCorrupted();
+ throw throwFSError();
}
@Override
public SliceableUnfilteredRowIterator iterator(FileDataInput file,
DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns,
boolean reversed, boolean isForThrift)
{
- throw throwCorrupted();
+ if (shouldThrowCorrupted)
+ throw throwCorrupted();
+ throw throwFSError();
}
private CorruptSSTableException throwCorrupted()
{
throw new CorruptSSTableException(new IOException("failed to get
position"), descriptor.baseFilename());
}
+
+ private FSError throwFSError()
+ {
+ throw new FSReadError(new IOException("failed to get position"),
descriptor.baseFilename());
+ }
}
}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
new file mode 100644
index 0000000000..f6ed9da7ed
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSErrorHandler;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class DefaultFSErrorHandlerTest
+{
+ private FSErrorHandler handler = new DefaultFSErrorHandler();
+ Config.DiskFailurePolicy oldDiskPolicy;
+ Config.DiskFailurePolicy testDiskPolicy;
+ private boolean gossipRunningFSError;
+ private boolean gossipRunningCorruptedSStableException;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise FS
error will kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+ StorageService.instance.initServer();
+ }
+
+ @AfterClass
+ public static void shutdown()
+ {
+ StorageService.instance.stopClient();
+ }
+
+ @Before
+ public void setup()
+ {
+ StorageService.instance.startGossiping();
+ assertTrue(Gossiper.instance.isEnabled());
+ oldDiskPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ }
+
+ public DefaultFSErrorHandlerTest(Config.DiskFailurePolicy policy,
+ boolean gossipRunningFSError,
+ boolean
gossipRunningCorruptedSStableException)
+ {
+ this.testDiskPolicy = policy;
+ this.gossipRunningFSError = gossipRunningFSError;
+ this.gossipRunningCorruptedSStableException =
gossipRunningCorruptedSStableException;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][]{
+ { Config.DiskFailurePolicy.die, false, false},
+ { Config.DiskFailurePolicy.ignore, true, true},
+ { Config.DiskFailurePolicy.stop, false, true},
+ { Config.DiskFailurePolicy.stop_paranoid, false,
false},
+ { Config.DiskFailurePolicy.best_effort, true,
true}
+ }
+ );
+ }
+
+ @After
+ public void teardown()
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(oldDiskPolicy);
+ }
+
+ @Test
+ public void testFSErrors()
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
+ handler.handleFSError(new FSReadError(new IOException(), "blah"));
+ assertEquals(gossipRunningFSError, Gossiper.instance.isEnabled());
+ }
+
+ @Test
+ public void testCorruptSSTableException()
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
+ handler.handleCorruptSSTable(new CorruptSSTableException(new
IOException(), "blah"));
+ assertEquals(gossipRunningCorruptedSStableException,
Gossiper.instance.isEnabled());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java
b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java
new file mode 100644
index 0000000000..90e85e9cc0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+@RunWith(Parameterized.class)
+public class DiskFailurePolicyTest
+{
+ DiskFailurePolicy originalDiskFailurePolicy;
+ JVMStabilityInspector.Killer originalKiller;
+ KillerForTests killerForTests;
+ DiskFailurePolicy testPolicy;
+ boolean isStartUpInProgress;
+ Throwable t;
+ boolean expectGossipRunning;
+ boolean expectJVMKilled;
+ boolean expectJVMKilledQuiet;
+
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.initServer();
+ FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
+ }
+
+ public DiskFailurePolicyTest(DiskFailurePolicy testPolicy, boolean
isStartUpInProgress, Throwable t,
+ boolean expectGossipRunning, boolean
jvmKilled, boolean jvmKilledQuiet)
+ {
+ this.testPolicy = testPolicy;
+ this.isStartUpInProgress = isStartUpInProgress;
+ this.t = t;
+ this.expectGossipRunning = expectGossipRunning;
+ this.expectJVMKilled = jvmKilled;
+ this.expectJVMKilledQuiet = jvmKilledQuiet;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][]{
+ { Config.DiskFailurePolicy.die, true, new
FSReadError(new IOException(), "blah"), false, true, true},
+ { DiskFailurePolicy.ignore, true, new
FSReadError(new IOException(), "blah"), true, false, false},
+ { DiskFailurePolicy.stop, true, new
FSReadError(new IOException(), "blah"), false, true, true},
+ { DiskFailurePolicy.stop_paranoid, true, new
FSReadError(new IOException(), "blah"), false, true, true},
+ { Config.DiskFailurePolicy.die, true, new
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+ { DiskFailurePolicy.ignore, true, new
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+ { DiskFailurePolicy.stop, true, new
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+ { DiskFailurePolicy.stop_paranoid, true, new
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+ { Config.DiskFailurePolicy.die, false, new
FSReadError(new IOException(), "blah"), false, true, false},
+ { DiskFailurePolicy.ignore, false, new
FSReadError(new IOException(), "blah"), true, false, false},
+ { DiskFailurePolicy.stop, false, new
FSReadError(new IOException(), "blah"), false, false, false},
+ { DiskFailurePolicy.stop_paranoid, false, new
FSReadError(new IOException(), "blah"), false, false, false},
+ { Config.DiskFailurePolicy.die, false, new
CorruptSSTableException(new IOException(), "blah"), false, true, false},
+ { DiskFailurePolicy.ignore, false, new
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+ { DiskFailurePolicy.stop, false, new
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+ { DiskFailurePolicy.stop_paranoid, false, new
CorruptSSTableException(new IOException(), "blah"), false, false, false}
+ }
+ );
+ }
+
+ @Before
+ public void setup()
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ if (!isStartUpInProgress)
+ daemon.completeSetup(); //mark startup completed
+ StorageService.instance.registerDaemon(daemon);
+ killerForTests = new KillerForTests();
+ originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ originalDiskFailurePolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ StorageService.instance.startGossiping();
+ Assert.assertTrue(Gossiper.instance.isEnabled());
+ }
+
+ @After
+ public void teardown()
+ {
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ DatabaseDescriptor.setDiskFailurePolicy(originalDiskFailurePolicy);
+ }
+
+ @Test
+ public void testPolicies()
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(testPolicy);
+ JVMStabilityInspector.inspectThrowable(t);
+ Assert.assertEquals(expectJVMKilled, killerForTests.wasKilled());
+ Assert.assertEquals(expectJVMKilledQuiet,
killerForTests.wasKilledQuietly());
+ if (!expectJVMKilled) {
+ // only verify gossip if JVM is not killed
+ Assert.assertEquals(expectGossipRunning,
Gossiper.instance.isEnabled());
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java
b/test/unit/org/apache/cassandra/utils/KillerForTests.java
index abc7952322..ad3a27436e 100644
--- a/test/unit/org/apache/cassandra/utils/KillerForTests.java
+++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java
@@ -29,6 +29,11 @@ public class KillerForTests extends
JVMStabilityInspector.Killer
@Override
protected void killCurrentJVM(Throwable t, boolean quiet)
{
+ if (killed)
+ {
+ // Can only be killed once
+ return;
+ }
this.killed = true;
this.quiet = quiet;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]