This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20d80118ac Add the ability to disable bulk loading of SSTables
20d80118ac is described below

commit 20d80118ac02d853d501e3f5c24a63a12d4010f5
Author: Runtian <[email protected]>
AuthorDate: Tue Aug 29 16:52:56 2023 -0700

    Add the ability to disable bulk loading of SSTables
    
    patch by Runtian Liu; reviewed by Stefan Miklosovic, Andres de la Peña and 
Brandon Williams for CASSANDRA-18781
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   2 +
 conf/cassandra.yaml                                |   3 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../apache/cassandra/config/GuardrailsOptions.java |  14 ++
 .../apache/cassandra/db/guardrails/Guardrail.java  |  18 ++-
 .../apache/cassandra/db/guardrails/Guardrails.java |  21 +++
 .../cassandra/db/guardrails/GuardrailsConfig.java  |   7 +
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  12 ++
 .../streaming/StreamDeserializingTask.java         |  31 +++-
 .../guardrails/GuardrailBulkLoadEnabledTest.java   | 179 +++++++++++++++++++++
 11 files changed, 284 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f9fdb07388..9b38559d7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
  * Clean up obsolete functions and simplify cql_version handling in cqlsh 
(CASSANDRA-18787)
 Merged from 5.0:
  * Do not log stacktrace on mismatched cache and schema version and checksum 
error in AutoSavingCache (CASSANDRA-18862)
diff --git a/NEWS.txt b/NEWS.txt
index c89978abb3..26de3a8045 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -71,6 +71,8 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - New Guardrails added:
+      - Whether bulk loading of SSTables is allowed.
 
 Upgrading
 ---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c41c3cb973..1e861a2144 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1889,6 +1889,9 @@ drop_compact_storage_enabled: false
 # Guardrail to allow/disallow DROP KEYSPACE statements
 # drop_keyspace_enabled: true
 #
+# Guardrail to allow/disallow bulk load of SSTables
+# bulk_load_enabled: true
+#
 # Guardrail to warn or fail when using a page size greater than threshold.
 # The two thresholds default to -1 to disable.
 # page_size_warn_threshold: -1
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 2157b225eb..d7fc1df3fe 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -860,6 +860,7 @@ public class Config
     public volatile boolean user_timestamps_enabled = true;
     public volatile boolean alter_table_enabled = true;
     public volatile boolean group_by_enabled = true;
+    public volatile boolean bulk_load_enabled = true;
     public volatile boolean drop_truncate_table_enabled = true;
     public volatile boolean drop_keyspace_enabled = true;
     public volatile boolean secondary_indexes_enabled = true;
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index b0cb259c26..321946d03e 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -364,6 +364,20 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> config.drop_keyspace_enabled = x);
     }
 
+    @Override
+    public boolean getBulkLoadEnabled()
+    {
+        return config.bulk_load_enabled;
+    }
+
+    public void setBulkLoadEnabled(boolean enabled)
+    {
+        updatePropertyWithLogging("bulk_load_enabled",
+                                  enabled,
+                                  () -> config.bulk_load_enabled,
+                                  x -> config.bulk_load_enabled = x);
+    }
+
     @Override
     public boolean getSecondaryIndexesEnabled()
     {
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index fbd1a5b880..0f6831cbbc 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -64,6 +64,9 @@ public abstract class Guardrail
     /** Time of last failure in milliseconds. */
     private volatile long lastFailInMs = 0;
 
+    /** Should throw exception if null client state is provided. */
+    private volatile boolean throwOnNullClientState = false;
+
     Guardrail(String name, @Nullable String reason)
     {
         this.name = name;
@@ -136,7 +139,7 @@ public abstract class Guardrail
             GuardrailsDiagnostics.failed(name, 
decorateMessage(redactedMessage));
         }
 
-        if (state != null)
+        if (state != null || throwOnNullClientState)
             throw new GuardrailViolatedException(message);
     }
 
@@ -169,6 +172,19 @@ public abstract class Guardrail
         return this;
     }
 
+    /**
+     * Note: this method is not thread safe and should only be used during 
guardrail initialization
+     *
+     * @param shouldThrow if exception should throw when Guardrail is violated,
+     *                    default false means don't throw expection when 
client state is not provided.
+     * @return current guardrail
+     */
+    Guardrail throwOnNullClientState(boolean shouldThrow)
+    {
+        this.throwOnNullClientState = shouldThrow;
+        return this;
+    }
+
     /**
      * reset last notify time to make sure it will notify downstream when 
{@link this#warn(String, String)}
      * or {@link this#fail(String, ClientState)} is called next time.
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 4eb1084e51..f7a6fed2b7 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -184,6 +184,15 @@ public final class Guardrails implements GuardrailsMBean
                     state -> 
CONFIG_PROVIDER.getOrCreate(state).getDropKeyspaceEnabled(),
                     "DROP KEYSPACE functionality");
 
+    /**
+     * Guardrail disabling bulk loading of SSTables
+     */
+    public static final EnableFlag bulkLoadEnabled =
+    (EnableFlag) new EnableFlag("bulk_load_enabled",
+                   "Bulk loading of SSTables might potentially destabilize the 
node.",
+                   state -> 
CONFIG_PROVIDER.getOrCreate(state).getBulkLoadEnabled(),
+                   "Bulk loading of SSTables").throwOnNullClientState(true);
+
     /**
      * Guardrail disabling user's ability to turn off compression
      */
@@ -779,6 +788,18 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setDropKeyspaceEnabled(enabled);
     }
 
+    @Override
+    public boolean getBulkLoadEnabled()
+    {
+        return DEFAULT_CONFIG.getBulkLoadEnabled();
+    }
+
+    @Override
+    public void setBulkLoadEnabled(boolean enabled)
+    {
+        DEFAULT_CONFIG.setBulkLoadEnabled(enabled);
+    }
+
     @Override
     public int getPageSizeWarnThreshold()
     {
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index ac6e5842e1..0c3947addb 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -175,6 +175,13 @@ public interface GuardrailsConfig
      */
     boolean getDropKeyspaceEnabled();
 
+    /**
+     * Returns whether bulk load of SSTables is allowed
+     *
+     * @return {@code true} if allowed, {@code false} otherwise.
+     */
+    boolean getBulkLoadEnabled();
+
     /**
      * @return The threshold to warn when page size exceeds given size.
      */
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index f05ae4366a..afa3be2193 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -316,6 +316,18 @@ public interface GuardrailsMBean
      */
     void setDropKeyspaceEnabled(boolean enabled);
 
+    /**
+     * Returns whether bulk load of SSTables is allowed
+     *
+     * @return {@code true} if allowed, {@code false} otherwise.
+     */
+    boolean getBulkLoadEnabled();
+
+    /**
+     * Sets whether bulk load of SSTables is allowed
+     */
+    void setBulkLoadEnabled(boolean enabled);
+
     /**
      * @return The threshold to warn when requested page size greater than 
threshold.
      * -1 means disabled.
diff --git 
a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java 
b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index 3785249430..70dd91a1ad 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -70,10 +72,23 @@ public class StreamDeserializingTask implements Runnable
                 if (session == null)
                     session = deriveSession(message);
 
-                if (logger.isDebugEnabled())
-                    logger.debug("{} Received {}", createLogTag(session, 
channel), message);
-
-                session.messageReceived(message);
+                if (session.getStreamOperation() == StreamOperation.BULK_LOAD)
+                {
+                    try
+                    {
+                        Guardrails.bulkLoadEnabled.ensureEnabled(null);
+                        receiveMessage(message);
+                    }
+                    catch (GuardrailViolatedException ex)
+                    {
+                        logger.warn("{} Aborting {}. Bulk load of SSTables is 
not allowed.", createLogTag(session, channel), message);
+                        session.abort();
+                    }
+                }
+                else
+                {
+                    receiveMessage(message);
+                }
             }
         }
         catch (Throwable t)
@@ -111,4 +126,12 @@ public class StreamDeserializingTask implements Runnable
         streamSession.attachInbound(channel);
         return streamSession;
     }
+
+    private void receiveMessage(StreamMessage message)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("{} Received {}", createLogTag(session, channel), 
message);
+
+        session.messageReceived(message);
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
new file mode 100644
index 0000000000..83a4675b8b
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailBulkLoadEnabledTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.BulkLoader;
+import org.apache.cassandra.tools.ToolRunner;
+
+import static com.google.common.collect.Lists.transform;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the guardrail for bulk load, {@link Guardrails#bulkLoadEnabled}.
+ */
+public class GuardrailBulkLoadEnabledTest extends GuardrailTester
+{
+    private static Path tempDir;
+    private static Cluster cluster;
+    private static String nodes;
+    private static int nativePort;
+    private static int storagePort;
+
+    public static Path tempDir() throws Exception
+    {
+        return Files.createTempDirectory("GuardrailBulkLoadEnabledTest");
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @BeforeClass
+    public static void setupCluster() throws Exception
+    {
+        tempDir = tempDir();
+        cluster = init(Cluster.build(1).withConfig(c -> 
c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP)).start());
+        nodes = cluster.get(1).config().broadcastAddress().getHostString();
+        nativePort = 
cluster.get(1).callOnInstance(DatabaseDescriptor::getNativeTransportPort);
+        storagePort = 
cluster.get(1).callOnInstance(DatabaseDescriptor::getStoragePort);
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (cluster != null)
+            cluster.close();
+
+        for (File f : new File(tempDir).tryList())
+        {
+            f.tryDelete();
+        }
+    }
+
+    @Test
+    public void bulkLoaderEnabled() throws Throwable
+    {
+        File sstablesToUpload = prepareSstablesForUpload();
+        // bulk load SSTables work as expected
+        ToolRunner.ToolResult tool = loadData(sstablesToUpload);
+        tool.assertOnCleanExit();
+        assertTrue(tool.getStdout().contains("Summary statistics"));
+        assertRows(cluster.get(1).executeInternal("SELECT count(*) FROM 
bulk_load_tables.test"), row(22L));
+
+        // truncate table
+        truncateGeneratedTables();
+        assertRows(cluster.get(1).executeInternal("SELECT * FROM 
bulk_load_tables.test"), EMPTY_ROWS);
+
+        // Disable bulk load, stream should fail and no data should be loaded
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setBulkLoadEnabled(false));
+        long mark = cluster.get(1).logs().mark();
+        tool = loadData(sstablesToUpload);
+
+        cluster.get(1).logs().watchFor(mark, "Bulk load of SSTables is not 
allowed");
+        tool.asserts().failure().errorContains("Stream failed");
+        assertRows(cluster.get(1).executeInternal("SELECT * FROM 
bulk_load_tables.test"), EMPTY_ROWS);
+
+        // Enable bulk load again, data should be loaded successfully
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setBulkLoadEnabled(true));
+        tool = loadData(sstablesToUpload);
+        tool.assertOnCleanExit();
+        assertTrue(tool.getStdout().contains("Summary statistics"));
+        assertRows(cluster.get(1).executeInternal("SELECT count(*) FROM 
bulk_load_tables.test"), row(22L));
+    }
+
+    private static ToolRunner.ToolResult loadData(File sstablesToUpload)
+    {
+        return ToolRunner.invokeClass(BulkLoader.class,
+                                      "--nodes", nodes,
+                                      "--port", Integer.toString(nativePort),
+                                      "--storage-port", 
Integer.toString(storagePort),
+                                      sstablesToUpload.absolutePath());
+    }
+
+    private static File prepareSstablesForUpload() throws IOException
+    {
+        generateSSTables();
+        File sstableDir = copySStablesFromDataDir("test");
+        truncateGeneratedTables();
+        return sstableDir;
+    }
+
+    private static File copySStablesFromDataDir(String table) throws 
IOException
+    {
+        File cfDir = new File(tempDir +  File.pathSeparator() + 
"bulk_load_tables" + File.pathSeparator() + table);
+        cfDir.tryCreateDirectories();
+        List<String> keyspaceDirPaths = cluster.get(1).callOnInstance(
+        () -> Keyspace.open("bulk_load_tables")
+                      .getColumnFamilyStore(table)
+                      .getDirectories()
+                      .getCFDirectories()
+                      .stream()
+                      .map(File::absolutePath)
+                      .collect(toList())
+        );
+        for (File srcDir : transform(keyspaceDirPaths, File::new))
+        {
+            for (File file : srcDir.tryList(File::isFile))
+                FileUtils.copyFileToDirectory(file.toJavaIOFile(), 
cfDir.toJavaIOFile());
+        }
+        return cfDir;
+    }
+
+    private static void generateSSTables()
+    {
+        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS bulk_load_tables 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+        cluster.schemaChange("CREATE TABLE IF NOT EXISTS bulk_load_tables.test 
(pk int, val text, PRIMARY KEY (pk))");
+        for (int i = 0; i < 22; i++)
+        {
+            cluster.get(1).executeInternal(String.format("INSERT INTO 
bulk_load_tables.test (pk, val) VALUES (%s, '%s')", i, i));
+        }
+        cluster.get(1).runOnInstance(rethrow(() -> 
StorageService.instance.forceKeyspaceFlush("bulk_load_tables", UNIT_TESTS)));
+    }
+
+    private static void truncateGeneratedTables()
+    {
+        cluster.get(1).executeInternal("TRUNCATE bulk_load_tables.test");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to