This is an automated email from the ASF dual-hosted git repository.
paulo pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 45afd18a13 Automatically disable zero-copy streaming for legacy
sstables
45afd18a13 is described below
commit 45afd18a13d6f2d4b229edad44488264471c93a8
Author: Paulo Motta <[email protected]>
AuthorDate: Wed Dec 24 15:37:42 2025 -0500
Automatically disable zero-copy streaming for legacy sstables
Legacy Cassandra 3.x sstables use an old bloom filter format that is
incompatible with zero-copy streaming in Cassandra 5.0+. This patch
automatically detects sstables with the old bloom filter format
(pre-4.0) and disables zero-copy streaming for them, allowing legacy
sstables to be loaded via sstableloader without requiring manual flags.
The fix adds a version check in
CassandraOutgoingFile.computeShouldStreamEntireSSTables()
that calls descriptor.version.hasOldBfFormat() to detect legacy sstables.
patch by Paulo Motta; reviewed by Stefan Miklosovic for CASSANDRA-21092
---
CHANGES.txt | 1 +
.../db/streaming/CassandraOutgoingFile.java | 7 +-
.../io/sstable/SSTableLoaderLegacyTest.java | 208 +++++++++++++++++++++
3 files changed, 214 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 51c34c6936..e70fc6e23b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.7
+ * Automatically disable zero-copy streaming for legacy sstables with old
bloom filter format (CASSANDRA-21092)
* Fix CQLSSTableWriter serialization of vector of date and time
(CASSANDRA-20979)
* Correctly calculate default for FailureDetector max interval
(CASSANDRA-21025)
* Adding missing configs in system_views.settings to be backward compatible
(CASSANDRA-20863)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 7572749d37..9fc04e7e0d 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -180,8 +180,11 @@ public class CassandraOutgoingFile implements
OutgoingStream
@VisibleForTesting
public boolean computeShouldStreamEntireSSTables()
{
- // don't stream if full sstable transfers are disabled or legacy
counter shards are present
- if (!DatabaseDescriptor.streamEntireSSTables() ||
ref.get().getSSTableMetadata().hasLegacyCounterShards)
+ // don't stream if full sstable transfers are disabled, legacy counter
shards are present,
+ // or sstable uses old bloom filter format (pre-4.0) which is
incompatible with zero-copy streaming
+ if (!DatabaseDescriptor.streamEntireSSTables() ||
+ ref.get().getSSTableMetadata().hasLegacyCounterShards ||
+ ref.get().descriptor.version.hasOldBfFormat())
return false;
return contained(sections, ref.get());
diff --git
a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderLegacyTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderLegacyTest.java
new file mode 100644
index 0000000000..91d9d45bce
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderLegacyTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests SSTableLoader with legacy sstables from Cassandra 3.x
+ */
+public class SSTableLoaderLegacyTest
+{
+ public static final String KEYSPACE1 = "sstableloaderlegacytest";
+ public static final String LEGACY_VERSION = "me"; // Cassandra 3.11
+ public static final String LEGACY_TABLE = "legacy_me_simple";
+
+ private static File LEGACY_SSTABLE_ROOT;
+ private File tmpdir;
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ String scp =
CassandraRelevantProperties.TEST_LEGACY_SSTABLE_ROOT.getString();
+ if (scp == null || scp.isEmpty())
+ {
+ throw new RuntimeException("System property for legacy sstable
root is not set.");
+ }
+ LEGACY_SSTABLE_ROOT = new File(scp).toAbsolute();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
+
+ // Create table matching the legacy sstable schema
+ // legacy_me_simple has schema: pk text PRIMARY KEY, val text
+ QueryProcessor.executeInternal(String.format(
+ "CREATE TABLE %s.%s (pk text PRIMARY KEY, val text)",
+ KEYSPACE1, LEGACY_TABLE));
+
+ StorageService.instance.initServer();
+ }
+
+ @Before
+ public void setup() throws IOException
+ {
+ tmpdir = new
File(Files.createTempDirectory("sstableloaderlegacytest").toFile());
+ }
+
+ @After
+ public void cleanup()
+ {
+ FileUtils.deleteRecursive(tmpdir);
+ }
+
+ /**
+ * Test that loading legacy 3.11 sstables works automatically.
+ * Zero-copy streaming is automatically disabled for legacy sstables that
use the old bloom filter format.
+ */
+ @Test
+ public void testLoadLegacy311SSTable() throws Exception
+ {
+ assertTrue("Zero-copy streaming should be enabled by default",
+ DatabaseDescriptor.streamEntireSSTables());
+
+ File dataDir = setupLegacySSTableDirectory();
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(LEGACY_TABLE);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(),
+ new
OutputHandler.SystemOutput(false, false));
+
+ loader.stream(Collections.emptySet(),
completionStreamListener(latch)).get();
+ latch.await();
+
+ assertTrue("Data should be loaded from legacy sstable",
+ !Util.getAll(Util.cmd(cfs).build()).isEmpty());
+ }
+
+ private static final class TestClient extends SSTableLoader.Client
+ {
+ private String keyspace;
+
+ public void init(String keyspace)
+ {
+ this.keyspace = keyspace;
+ for (Replica replica :
StorageService.instance.getLocalReplicas(KEYSPACE1))
+ addRangeForEndpoint(replica.range(),
FBUtilities.getBroadcastAddressAndPort());
+ }
+
+ public TableMetadataRef getTableMetadata(String tableName)
+ {
+ return Schema.instance.getTableMetadataRef(keyspace, tableName);
+ }
+ }
+
+ /**
+ * Sets up a directory with legacy 3.11 sstables copied from test data.
+ */
+ private File setupLegacySSTableDirectory() throws IOException
+ {
+ File dataDir = new File(tmpdir, KEYSPACE1 + "/" + LEGACY_TABLE);
+ if (!dataDir.exists())
+ dataDir.createDirectoriesIfNotExists();
+
+ File legacyTableDir = new File(LEGACY_SSTABLE_ROOT,
+ String.format("%s/legacy_tables/%s",
LEGACY_VERSION, LEGACY_TABLE));
+
+ if (!legacyTableDir.isDirectory())
+ {
+ throw new RuntimeException("Legacy sstable directory not found: "
+ legacyTableDir);
+ }
+
+ // Copy all sstable components to the test directory
+ File[] sourceFiles = legacyTableDir.tryList();
+ if (sourceFiles != null)
+ {
+ for (File sourceFile : sourceFiles)
+ {
+ copyFile(sourceFile, new File(dataDir, sourceFile.name()));
+ }
+ }
+
+ System.out.println("Copied legacy sstables from: " + legacyTableDir);
+ System.out.println("To: " + dataDir);
+ File[] copiedFiles = dataDir.tryList();
+ System.out.println("File count: " + (copiedFiles != null ?
copiedFiles.length : 0));
+
+ return dataDir;
+ }
+
+ /**
+ * Copies a file from source to target.
+ */
+ private static void copyFile(File sourceFile, File targetFile) throws
IOException
+ {
+ if (sourceFile.isFile())
+ {
+ Files.copy(sourceFile.toPath(), targetFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+
+ /**
+ * Creates a stream completion listener.
+ */
+ private StreamEventHandler completionStreamListener(final CountDownLatch
latch)
+ {
+ return new StreamEventHandler()
+ {
+ public void onFailure(Throwable arg0)
+ {
+ latch.countDown();
+ }
+
+ public void onSuccess(StreamState arg0)
+ {
+ latch.countDown();
+ }
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ }
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]