This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20d80118ac Add the ability to disable bulk loading of SSTables
20d80118ac is described below
commit 20d80118ac02d853d501e3f5c24a63a12d4010f5
Author: Runtian <[email protected]>
AuthorDate: Tue Aug 29 16:52:56 2023 -0700
Add the ability to disable bulk loading of SSTables
patch by Runtian Liu; reviewed by Stefan Miklosovic, Andres de la Peña and
Brandon Williams for CASSANDRA-18781
---
CHANGES.txt | 1 +
NEWS.txt | 2 +
conf/cassandra.yaml | 3 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../apache/cassandra/config/GuardrailsOptions.java | 14 ++
.../apache/cassandra/db/guardrails/Guardrail.java | 18 ++-
.../apache/cassandra/db/guardrails/Guardrails.java | 21 +++
.../cassandra/db/guardrails/GuardrailsConfig.java | 7 +
.../cassandra/db/guardrails/GuardrailsMBean.java | 12 ++
.../streaming/StreamDeserializingTask.java | 31 +++-
.../guardrails/GuardrailBulkLoadEnabledTest.java | 179 +++++++++++++++++++++
11 files changed, 284 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f9fdb07388..9b38559d7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
* Clean up obsolete functions and simplify cql_version handling in cqlsh
(CASSANDRA-18787)
Merged from 5.0:
* Do not log stacktrace on mismatched cache and schema version and checksum
error in AutoSavingCache (CASSANDRA-18862)
diff --git a/NEWS.txt b/NEWS.txt
index c89978abb3..26de3a8045 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -71,6 +71,8 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - New Guardrails added:
+ - Whether bulk loading of SSTables is allowed.
Upgrading
---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c41c3cb973..1e861a2144 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1889,6 +1889,9 @@ drop_compact_storage_enabled: false
# Guardrail to allow/disallow DROP KEYSPACE statements
# drop_keyspace_enabled: true
#
+# Guardrail to allow/disallow bulk load of SSTables
+# bulk_load_enabled: true
+#
# Guardrail to warn or fail when using a page size greater than threshold.
# The two thresholds default to -1 to disable.
# page_size_warn_threshold: -1
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 2157b225eb..d7fc1df3fe 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -860,6 +860,7 @@ public class Config
public volatile boolean user_timestamps_enabled = true;
public volatile boolean alter_table_enabled = true;
public volatile boolean group_by_enabled = true;
+ public volatile boolean bulk_load_enabled = true;
public volatile boolean drop_truncate_table_enabled = true;
public volatile boolean drop_keyspace_enabled = true;
public volatile boolean secondary_indexes_enabled = true;
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index b0cb259c26..321946d03e 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -364,6 +364,20 @@ public class GuardrailsOptions implements GuardrailsConfig
x -> config.drop_keyspace_enabled = x);
}
+ @Override
+ public boolean getBulkLoadEnabled()
+ {
+ return config.bulk_load_enabled;
+ }
+
+ public void setBulkLoadEnabled(boolean enabled)
+ {
+ updatePropertyWithLogging("bulk_load_enabled",
+ enabled,
+ () -> config.bulk_load_enabled,
+ x -> config.bulk_load_enabled = x);
+ }
+
@Override
public boolean getSecondaryIndexesEnabled()
{
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index fbd1a5b880..0f6831cbbc 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -64,6 +64,9 @@ public abstract class Guardrail
/** Time of last failure in milliseconds. */
private volatile long lastFailInMs = 0;
+ /** Should throw exception if null client state is provided. */
+ private volatile boolean throwOnNullClientState = false;
+
Guardrail(String name, @Nullable String reason)
{
this.name = name;
@@ -136,7 +139,7 @@ public abstract class Guardrail
GuardrailsDiagnostics.failed(name,
decorateMessage(redactedMessage));
}
- if (state != null)
+ if (state != null || throwOnNullClientState)
throw new GuardrailViolatedException(message);
}
@@ -169,6 +172,19 @@ public abstract class Guardrail
return this;
}
+ /**
+ * Note: this method is not thread safe and should only be used during
guardrail initialization
+ *
+ * @param shouldThrow if exception should throw when Guardrail is violated,
+ * default false means don't throw expection when
client state is not provided.
+ * @return current guardrail
+ */
+ Guardrail throwOnNullClientState(boolean shouldThrow)
+ {
+ this.throwOnNullClientState = shouldThrow;
+ return this;
+ }
+
/**
* reset last notify time to make sure it will notify downstream when
{@link this#warn(String, String)}
* or {@link this#fail(String, ClientState)} is called next time.
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 4eb1084e51..f7a6fed2b7 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -184,6 +184,15 @@ public final class Guardrails implements GuardrailsMBean
state ->
CONFIG_PROVIDER.getOrCreate(state).getDropKeyspaceEnabled(),
"DROP KEYSPACE functionality");
+ /**
+ * Guardrail disabling bulk loading of SSTables
+ */
+ public static final EnableFlag bulkLoadEnabled =
+ (EnableFlag) new EnableFlag("bulk_load_enabled",
+ "Bulk loading of SSTables might potentially destabilize the
node.",
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getBulkLoadEnabled(),
+ "Bulk loading of SSTables").throwOnNullClientState(true);
+
/**
* Guardrail disabling user's ability to turn off compression
*/
@@ -779,6 +788,18 @@ public final class Guardrails implements GuardrailsMBean
DEFAULT_CONFIG.setDropKeyspaceEnabled(enabled);
}
+ @Override
+ public boolean getBulkLoadEnabled()
+ {
+ return DEFAULT_CONFIG.getBulkLoadEnabled();
+ }
+
+ @Override
+ public void setBulkLoadEnabled(boolean enabled)
+ {
+ DEFAULT_CONFIG.setBulkLoadEnabled(enabled);
+ }
+
@Override
public int getPageSizeWarnThreshold()
{
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index ac6e5842e1..0c3947addb 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -175,6 +175,13 @@ public interface GuardrailsConfig
*/
boolean getDropKeyspaceEnabled();
+ /**
+ * Returns whether bulk load of SSTables is allowed
+ *
+ * @return {@code true} if allowed, {@code false} otherwise.
+ */
+ boolean getBulkLoadEnabled();
+
/**
* @return The threshold to warn when page size exceeds given size.
*/
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index f05ae4366a..afa3be2193 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -316,6 +316,18 @@ public interface GuardrailsMBean
*/
void setDropKeyspaceEnabled(boolean enabled);
+ /**
+ * Returns whether bulk load of SSTables is allowed
+ *
+ * @return {@code true} if allowed, {@code false} otherwise.
+ */
+ boolean getBulkLoadEnabled();
+
+ /**
+ * Sets whether bulk load of SSTables is allowed
+ */
+ void setBulkLoadEnabled(boolean enabled);
+
/**
* @return The threshold to warn when requested page size greater than
threshold.
* -1 means disabled.
diff --git
a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index 3785249430..70dd91a1ad 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -70,10 +72,23 @@ public class StreamDeserializingTask implements Runnable
if (session == null)
session = deriveSession(message);
- if (logger.isDebugEnabled())
- logger.debug("{} Received {}", createLogTag(session,
channel), message);
-
- session.messageReceived(message);
+ if (session.getStreamOperation() == StreamOperation.BULK_LOAD)
+ {
+ try
+ {
+ Guardrails.bulkLoadEnabled.ensureEnabled(null);
+ receiveMessage(message);
+ }
+ catch (GuardrailViolatedException ex)
+ {
+ logger.warn("{} Aborting {}. Bulk load of SSTables is
not allowed.", createLogTag(session, channel), message);
+ session.abort();
+ }
+ }
+ else
+ {
+ receiveMessage(message);
+ }
}
}
catch (Throwable t)
@@ -111,4 +126,12 @@ public class StreamDeserializingTask implements Runnable
streamSession.attachInbound(channel);
return streamSession;
}
+
+ private void receiveMessage(StreamMessage message)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("{} Received {}", createLogTag(session, channel),
message);
+
+ session.messageReceived(message);
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
new file mode 100644
index 0000000000..83a4675b8b
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.BulkLoader;
+import org.apache.cassandra.tools.ToolRunner;
+
+import static com.google.common.collect.Lists.transform;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the guardrail for bulk load, {@link Guardrails#bulkLoadEnabled}.
+ */
+public class GuardrailBulkLoadEnabledTest extends GuardrailTester
+{
+ private static Path tempDir;
+ private static Cluster cluster;
+ private static String nodes;
+ private static int nativePort;
+ private static int storagePort;
+
+ public static Path tempDir() throws Exception
+ {
+ return Files.createTempDirectory("GuardrailBulkLoadEnabledTest");
+ }
+
+ @Override
+ protected Cluster getCluster()
+ {
+ return cluster;
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception
+ {
+ tempDir = tempDir();
+ cluster = init(Cluster.build(1).withConfig(c ->
c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP)).start());
+ nodes = cluster.get(1).config().broadcastAddress().getHostString();
+ nativePort =
cluster.get(1).callOnInstance(DatabaseDescriptor::getNativeTransportPort);
+ storagePort =
cluster.get(1).callOnInstance(DatabaseDescriptor::getStoragePort);
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (cluster != null)
+ cluster.close();
+
+ for (File f : new File(tempDir).tryList())
+ {
+ f.tryDelete();
+ }
+ }
+
+ @Test
+ public void bulkLoaderEnabled() throws Throwable
+ {
+ File sstablesToUpload = prepareSstablesForUpload();
+ // bulk load SSTables work as expected
+ ToolRunner.ToolResult tool = loadData(sstablesToUpload);
+ tool.assertOnCleanExit();
+ assertTrue(tool.getStdout().contains("Summary statistics"));
+ assertRows(cluster.get(1).executeInternal("SELECT count(*) FROM
bulk_load_tables.test"), row(22L));
+
+ // truncate table
+ truncateGeneratedTables();
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM
bulk_load_tables.test"), EMPTY_ROWS);
+
+ // Disable bulk load, stream should fail and no data should be loaded
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setBulkLoadEnabled(false));
+ long mark = cluster.get(1).logs().mark();
+ tool = loadData(sstablesToUpload);
+
+ cluster.get(1).logs().watchFor(mark, "Bulk load of SSTables is not
allowed");
+ tool.asserts().failure().errorContains("Stream failed");
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM
bulk_load_tables.test"), EMPTY_ROWS);
+
+ // Enable bulk load again, data should be loaded successfully
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setBulkLoadEnabled(true));
+ tool = loadData(sstablesToUpload);
+ tool.assertOnCleanExit();
+ assertTrue(tool.getStdout().contains("Summary statistics"));
+ assertRows(cluster.get(1).executeInternal("SELECT count(*) FROM
bulk_load_tables.test"), row(22L));
+ }
+
+ private static ToolRunner.ToolResult loadData(File sstablesToUpload)
+ {
+ return ToolRunner.invokeClass(BulkLoader.class,
+ "--nodes", nodes,
+ "--port", Integer.toString(nativePort),
+ "--storage-port",
Integer.toString(storagePort),
+ sstablesToUpload.absolutePath());
+ }
+
+ private static File prepareSstablesForUpload() throws IOException
+ {
+ generateSSTables();
+ File sstableDir = copySStablesFromDataDir("test");
+ truncateGeneratedTables();
+ return sstableDir;
+ }
+
+ private static File copySStablesFromDataDir(String table) throws
IOException
+ {
+ File cfDir = new File(tempDir + File.pathSeparator() +
"bulk_load_tables" + File.pathSeparator() + table);
+ cfDir.tryCreateDirectories();
+ List<String> keyspaceDirPaths = cluster.get(1).callOnInstance(
+ () -> Keyspace.open("bulk_load_tables")
+ .getColumnFamilyStore(table)
+ .getDirectories()
+ .getCFDirectories()
+ .stream()
+ .map(File::absolutePath)
+ .collect(toList())
+ );
+ for (File srcDir : transform(keyspaceDirPaths, File::new))
+ {
+ for (File file : srcDir.tryList(File::isFile))
+ FileUtils.copyFileToDirectory(file.toJavaIOFile(),
cfDir.toJavaIOFile());
+ }
+ return cfDir;
+ }
+
+ private static void generateSSTables()
+ {
+ cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS bulk_load_tables
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS bulk_load_tables.test
(pk int, val text, PRIMARY KEY (pk))");
+ for (int i = 0; i < 22; i++)
+ {
+ cluster.get(1).executeInternal(String.format("INSERT INTO
bulk_load_tables.test (pk, val) VALUES (%s, '%s')", i, i));
+ }
+ cluster.get(1).runOnInstance(rethrow(() ->
StorageService.instance.forceKeyspaceFlush("bulk_load_tables", UNIT_TESTS)));
+ }
+
+ private static void truncateGeneratedTables()
+ {
+ cluster.get(1).executeInternal("TRUNCATE bulk_load_tables.test");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]