This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acd815898d Node Draining Should Abort All Current SSTables Imports
acd815898d is described below

commit acd815898d1e5669e7d8ee2469808f6e8a14a7df
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Fri Apr 7 09:52:44 2023 -0700

    Node Draining Should Abort All Current SSTables Imports
    
    patch by Yuriy Semchyshyn; reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRA-18373
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/SSTableImporter.java   |  31 ++++--
 .../cassandra/service/StorageServiceDrainTest.java | 108 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d960691b5b..d3ad294f62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0
+ * Node Draining Should Abort All Current SSTables Imports (CASSANDRA-18373)
  * Use snake case for the names of CQL native functions (CASSANDRA-18037)
  * Use jdk-dependent checkstyle version to check the source code 
(CASSANDRA-18262)
  * Provide summary of failed SessionInfo's in StreamResultFuture 
(CASSANDRA-17199)
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java 
b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 87402130cf..66a56dc3cb 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -41,6 +41,7 @@ import 
org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -95,19 +96,20 @@ public class SSTableImporter
                     {
                         try
                         {
+                            abortIfDraining();
                             verifySSTableForImport(descriptor, 
entry.getValue(), options.verifyTokens, options.verifySSTables, 
options.extendedVerify);
                         }
                         catch (Throwable t)
                         {
                             if (dir != null)
                             {
-                                logger.error("[{}] Failed verifying sstable {} 
in directory {}", importID, descriptor, dir, t);
+                                logger.error("[{}] Failed verifying SSTable {} 
in directory {}", importID, descriptor, dir, t);
                                 failedDirectories.add(dir);
                             }
                             else
                             {
-                                logger.error("[{}] Failed verifying sstable 
{}", importID, descriptor, t);
-                                throw new RuntimeException("Failed verifying 
sstable "+descriptor, t);
+                                logger.error("[{}] Failed verifying SSTable 
{}", importID, descriptor, t);
+                                throw new RuntimeException("Failed verifying 
SSTable " + descriptor, t);
                             }
                             break;
                         }
@@ -130,6 +132,7 @@ public class SSTableImporter
             {
                 try
                 {
+                    abortIfDraining();
                     Descriptor oldDescriptor = entry.getKey();
                     if (currentDescriptors.contains(oldDescriptor))
                         continue;
@@ -162,8 +165,8 @@ public class SSTableImporter
                     }
                     else
                     {
-                        logger.error("[{}] Failed importing sstables from data 
directory - renamed sstables are: {}", importID, movedSSTables);
-                        throw new RuntimeException("Failed importing 
sstables", t);
+                        logger.error("[{}] Failed importing sstables from data 
directory - renamed SSTables are: {}", importID, movedSSTables, t);
+                        throw new RuntimeException("Failed importing 
SSTables", t);
                     }
                 }
             }
@@ -182,19 +185,35 @@ public class SSTableImporter
 
         try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
         {
+            abortIfDraining();
             cfs.getTracker().addSSTables(newSSTables);
             for (SSTableReader reader : newSSTables)
             {
                 if (options.invalidateCaches && cfs.isRowCacheEnabled())
                     invalidateCachesForSSTable(reader);
             }
-
+        }
+        catch (Throwable t)
+        {
+            logger.error("[{}] Failed adding SSTables", importID, t);
+            throw new RuntimeException("Failed adding SSTables", t);
         }
 
         logger.info("[{}] Done loading load new SSTables for {}/{}", importID, 
cfs.keyspace.getName(), cfs.getTableName());
         return failedDirectories;
     }
 
+    /**
+     * Check the state of this node and throws an {@link InterruptedException} 
if it is currently draining
+     *
+     * @throws InterruptedException if the node is draining
+     */
+    private static void abortIfDraining() throws InterruptedException
+    {
+        if (StorageService.instance.isDraining())
+            throw new InterruptedException("SSTables import has been aborted");
+    }
+
     private void logLeveling(UUID importID, Set<SSTableReader> newSSTables)
     {
         StringBuilder sb = new StringBuilder();
diff --git 
a/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java 
b/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java
new file mode 100644
index 0000000000..f8da0964a9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
+
+public class StorageServiceDrainTest
+{
+    private static final String KEYSPACE = "keyspace";
+    private static final String TABLE = "table";
+    private static final String COLUMN = "column";
+    private static final int ROWS = 1000;
+
+    @Before
+    public void before() throws UnknownHostException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+        CommitLog.instance.start();
+
+        CompactionManager.instance.disableAutoCompaction();
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), 
SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+
+        StorageService.instance
+                .getTokenMetadata()
+                .updateNormalToken(new BytesToken((new byte[]{50})), 
InetAddressAndPort.getByName("127.0.0.1"));
+
+        final ColumnFamilyStore table = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+        for (int row = 0; row < ROWS; row++)
+        {
+            final ByteBuffer value = ByteBufferUtil.bytes(String.valueOf(row));
+            new RowUpdateBuilder(table.metadata(), System.currentTimeMillis(), 
value)
+                    .clustering(ByteBufferUtil.bytes(COLUMN))
+                    .add("val", value)
+                    .build()
+                    .applyUnsafe();
+        }
+        Util.flush(table);
+    }
+
+    @Test
+    public void testSSTablesImportAbort()
+    {
+        final ColumnFamilyStore table = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+
+        assertTrue(table
+                .importNewSSTables(Collections.emptySet(), false, false, 
false, false, false, false, false)
+                .isEmpty());
+
+        Executors.newSingleThreadExecutor().execute(() -> {
+                try
+                {
+                    StorageService.instance.drain();
+                }
+                catch (final Exception exception)
+                {
+                    throw new RuntimeException(exception);
+                }});
+
+        while (!StorageService.instance.isDraining())
+            Thread.yield();
+
+        assertThatThrownBy(() -> table
+                .importNewSSTables(Collections.emptySet(), false, false, 
false, false, false, false, false))
+                .isInstanceOf(RuntimeException.class)
+                .hasCauseInstanceOf(InterruptedException.class);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to