This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new acd815898d Node Draining Should Abort All Current SSTables Imports
acd815898d is described below
commit acd815898d1e5669e7d8ee2469808f6e8a14a7df
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Fri Apr 7 09:52:44 2023 -0700
Node Draining Should Abort All Current SSTables Imports
patch by Yuriy Semchyshyn; reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRA-18373
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SSTableImporter.java | 31 ++++--
.../cassandra/service/StorageServiceDrainTest.java | 108 +++++++++++++++++++++
3 files changed, 134 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d960691b5b..d3ad294f62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0
+ * Node Draining Should Abort All Current SSTables Imports (CASSANDRA-18373)
* Use snake case for the names of CQL native functions (CASSANDRA-18037)
* Use jdk-dependent checkstyle version to check the source code
(CASSANDRA-18262)
* Provide summary of failed SessionInfo's in StreamResultFuture
(CASSANDRA-17199)
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java
b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 87402130cf..66a56dc3cb 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -41,6 +41,7 @@ import
org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -95,19 +96,20 @@ public class SSTableImporter
{
try
{
+ abortIfDraining();
verifySSTableForImport(descriptor,
entry.getValue(), options.verifyTokens, options.verifySSTables,
options.extendedVerify);
}
catch (Throwable t)
{
if (dir != null)
{
- logger.error("[{}] Failed verifying sstable {}
in directory {}", importID, descriptor, dir, t);
+ logger.error("[{}] Failed verifying SSTable {}
in directory {}", importID, descriptor, dir, t);
failedDirectories.add(dir);
}
else
{
- logger.error("[{}] Failed verifying sstable
{}", importID, descriptor, t);
- throw new RuntimeException("Failed verifying
sstable "+descriptor, t);
+ logger.error("[{}] Failed verifying SSTable
{}", importID, descriptor, t);
+ throw new RuntimeException("Failed verifying
SSTable " + descriptor, t);
}
break;
}
@@ -130,6 +132,7 @@ public class SSTableImporter
{
try
{
+ abortIfDraining();
Descriptor oldDescriptor = entry.getKey();
if (currentDescriptors.contains(oldDescriptor))
continue;
@@ -162,8 +165,8 @@ public class SSTableImporter
}
else
{
- logger.error("[{}] Failed importing sstables from data
directory - renamed sstables are: {}", importID, movedSSTables);
- throw new RuntimeException("Failed importing
sstables", t);
+ logger.error("[{}] Failed importing sstables from data
directory - renamed SSTables are: {}", importID, movedSSTables, t);
+ throw new RuntimeException("Failed importing
SSTables", t);
}
}
}
@@ -182,19 +185,35 @@ public class SSTableImporter
try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
{
+ abortIfDraining();
cfs.getTracker().addSSTables(newSSTables);
for (SSTableReader reader : newSSTables)
{
if (options.invalidateCaches && cfs.isRowCacheEnabled())
invalidateCachesForSSTable(reader);
}
-
+ }
+ catch (Throwable t)
+ {
+ logger.error("[{}] Failed adding SSTables", importID, t);
+ throw new RuntimeException("Failed adding SSTables", t);
}
logger.info("[{}] Done loading load new SSTables for {}/{}", importID,
cfs.keyspace.getName(), cfs.getTableName());
return failedDirectories;
}
+ /**
+ * Check the state of this node and throws an {@link InterruptedException}
if it is currently draining
+ *
+ * @throws InterruptedException if the node is draining
+ */
+ private static void abortIfDraining() throws InterruptedException
+ {
+ if (StorageService.instance.isDraining())
+ throw new InterruptedException("SSTables import has been aborted");
+ }
+
private void logLeveling(UUID importID, Set<SSTableReader> newSSTables)
{
StringBuilder sb = new StringBuilder();
diff --git
a/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java
b/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java
new file mode 100644
index 0000000000..f8da0964a9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageServiceDrainTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
+
+public class StorageServiceDrainTest
+{
+ private static final String KEYSPACE = "keyspace";
+ private static final String TABLE = "table";
+ private static final String COLUMN = "column";
+ private static final int ROWS = 1000;
+
+ @Before
+ public void before() throws UnknownHostException
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+ CommitLog.instance.start();
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+
+ StorageService.instance
+ .getTokenMetadata()
+ .updateNormalToken(new BytesToken((new byte[]{50})),
InetAddressAndPort.getByName("127.0.0.1"));
+
+ final ColumnFamilyStore table =
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+ for (int row = 0; row < ROWS; row++)
+ {
+ final ByteBuffer value = ByteBufferUtil.bytes(String.valueOf(row));
+ new RowUpdateBuilder(table.metadata(), System.currentTimeMillis(),
value)
+ .clustering(ByteBufferUtil.bytes(COLUMN))
+ .add("val", value)
+ .build()
+ .applyUnsafe();
+ }
+ Util.flush(table);
+ }
+
+ @Test
+ public void testSSTablesImportAbort()
+ {
+ final ColumnFamilyStore table =
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+
+ assertTrue(table
+ .importNewSSTables(Collections.emptySet(), false, false,
false, false, false, false, false)
+ .isEmpty());
+
+ Executors.newSingleThreadExecutor().execute(() -> {
+ try
+ {
+ StorageService.instance.drain();
+ }
+ catch (final Exception exception)
+ {
+ throw new RuntimeException(exception);
+ }});
+
+ while (!StorageService.instance.isDraining())
+ Thread.yield();
+
+ assertThatThrownBy(() -> table
+ .importNewSSTables(Collections.emptySet(), false, false,
false, false, false, false, false))
+ .isInstanceOf(RuntimeException.class)
+ .hasCauseInstanceOf(InterruptedException.class);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]