This is an automated email from the ASF dual-hosted git repository.
jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92f7c8d Ignore COMPACT STORAGE flag for tables for which its safe to
do so
92f7c8d is described below
commit 92f7c8db1444bf5d757cd50dba2211a446f3b22c
Author: Jordan West <[email protected]>
AuthorDate: Mon Aug 17 14:20:32 2020 -0700
Ignore COMPACT STORAGE flag for tables for which its safe to do so
patch by Jordan West; Reviewed by Marcus Eriksson and Caleb Rackliffe for
CASSANDRA-16048
---
CHANGES.txt | 1 +
.../apache/cassandra/schema/SchemaKeyspace.java | 60 ++++++-
.../cassandra/distributed/impl/Instance.java | 10 ++
.../upgrade/CompactStorage3to4UpgradeTest.java | 190 +++++++++++++++++++++
4 files changed, 255 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 70bbd4f..04af373 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@ Merged from 3.0:
* Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
Merged from 2.2:
* Fixed a NullPointerException when calling nodetool enablethrift
(CASSANDRA-16127)
+ * Automatically drop compact storage on tables for which it is safe
(CASSANDRA-16048)
4.0-beta2
* Add addition incremental repair visibility to nodetool repair_admin
(CASSANDRA-14939)
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 0333ee6..f6dbe03 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -866,15 +866,34 @@ public final class SchemaKeyspace
StringBuilder messages = new StringBuilder();
for (UntypedResultSet.Row row : query(query))
{
- if
(SchemaConstants.isLocalSystemKeyspace(row.getString("keyspace_name")))
+ String keyspaceName = row.getString("keyspace_name");
+ if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
continue;
- Set<String> flags = row.getFrozenSet("flags", UTF8Type.instance);
- if
(TableMetadata.Flag.isLegacyCompactTable(TableMetadata.Flag.fromStringSet(flags)))
+ Set<TableMetadata.Flag> flags =
TableMetadata.Flag.fromStringSet(row.getFrozenSet("flags", UTF8Type.instance));
+ if (TableMetadata.Flag.isLegacyCompactTable(flags))
{
- messages.append(String.format("ALTER TABLE %s.%s DROP COMPACT
STORAGE;\n",
-
maybeQuote(row.getString("keyspace_name")),
-
maybeQuote(row.getString("table_name"))));
+ String tableName = row.getString("table_name");
+ if (isSafeToDropCompactStorage(keyspaceName, tableName))
+ {
+ flags.remove(TableMetadata.Flag.DENSE);
+ flags.add(TableMetadata.Flag.COMPOUND);
+ String update = String.format("UPDATE %s.%s SET flags={%s}
WHERE keyspace_name='%s' AND table_name='%s'",
+
SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES,
+
TableMetadata.Flag.toStringSet(flags).stream()
+
.map(f -> "'" + f + "'")
+
.collect(Collectors.joining(", ")),
+ keyspaceName, tableName);
+
+ logger.info("Safely dropping COMPACT STORAGE on {}.{}",
keyspaceName, tableName);
+ executeInternal(update);
+ }
+ else
+ {
+ messages.append(String.format("ALTER TABLE %s.%s DROP
COMPACT STORAGE;\n",
+
maybeQuote(row.getString("keyspace_name")),
+ maybeQuote(tableName)));
+ }
}
}
@@ -889,6 +908,35 @@ public final class SchemaKeyspace
}
}
+ private static boolean isSafeToDropCompactStorage(String keyspaceName,
String tableName)
+ {
+ if
(!Boolean.parseBoolean(System.getProperty("cassandra.auto_drop_compact_storage",
"false")))
+ return false;
+
+ String columnQuery = String.format("SELECT kind, type FROM %s.%s WHERE
keyspace_name='%s' and table_name='%s'",
+
SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName);
+
+ String simpleType = "empty";
+ int simpleCount = 0;
+ for (UntypedResultSet.Row row : query(columnQuery))
+ {
+ String kind = row.getString("kind");
+ if (kind.equalsIgnoreCase("partition_key") ||
kind.equalsIgnoreCase("clustering"))
+ continue;
+
+ if (kind.equalsIgnoreCase("static"))
+ return false;
+
+ simpleCount++; // if not partition, clustering, or static column
then its a regular columnb
+ simpleType = row.getString("type"); // only save one type becuase
if there is > 1 simple column then false is returned
+ }
+
+ if (simpleCount == 1 && !simpleType.equalsIgnoreCase("empty"))
+ return true;
+
+ return false;
+ }
+
private static Keyspaces fetchKeyspacesWithout(Set<String>
excludedKeyspaceNames)
{
String query = format("SELECT keyspace_name FROM %s.%s",
SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 9a3fd08..edd525d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -79,6 +79,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
@@ -103,6 +104,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.DefaultFSErrorHandler;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StartupChecks;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamReceiveTask;
@@ -415,6 +417,14 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
DatabaseDescriptor.createAllDirectories();
CommitLog.instance.start();
+ try
+ {
+ new StartupChecks().withDefaultTests().verify();
+ } catch (StartupException e)
+ {
+ throw e;
+ }
+
// We need to persist this as soon as possible after startup
checks.
// This should be the first write to SystemKeyspace
(CASSANDRA-11742)
SystemKeyspace.persistLocalMetadata();
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
new file mode 100644
index 0000000..5317517
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class CompactStorage3to4UpgradeTest extends UpgradeTestBase
+{
+
+ public static final String TABLE_NAME = "cs_tbl";
+ public static final String CREATE_TABLE_C1_R1 = String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, v int, PRIMARY KEY (key, c1))
WITH COMPACT STORAGE",
+ KEYSPACE, TABLE_NAME);
+ public static final String CREATE_TABLE_C1_ONLY = String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, PRIMARY KEY (key, c1)) WITH
COMPACT STORAGE",
+ KEYSPACE, TABLE_NAME);
+ public static final String CREATE_TABLE_R_ONLY = String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, c2 int, PRIMARY KEY (key)) WITH
COMPACT STORAGE",
+ KEYSPACE, TABLE_NAME);
+
+ public static final String INSERT_C1_R1 = String.format(
+ "INSERT INTO %s.%s (key, c1, v) VALUES (?, ?, ?)",
+ KEYSPACE, TABLE_NAME);
+
+ @Test
+ public void ignoreDenseCompoundTablesWithValueColumn() throws Throwable
+ {
+ System.setProperty("cassandra.auto_drop_compact_storage", "true");
+ final int partitions = 10;
+ final int rowsPerPartition = 10;
+
+ DropCompactTestHelper helper = new DropCompactTestHelper();
+ new TestCase()
+ .nodes(2)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .setup(cluster -> {
+ cluster.schemaChange(CREATE_TABLE_C1_R1);
+
+ ICoordinator coordinator = cluster.coordinator(1);
+ for (int i = 1; i <= partitions; i++)
+ for (int j = 1; j <= rowsPerPartition; j++)
+ coordinator.execute(INSERT_C1_R1, ConsistencyLevel.ALL, i,
j, i + j);
+
+
+ runQueries(coordinator, helper, new String[]{
+ String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE_NAME),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, TABLE_NAME, partitions - 3,
rowsPerPartition - 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, TABLE_NAME, partitions - 1,
rowsPerPartition - 5),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, TABLE_NAME, partitions - 8,
rowsPerPartition - 3),
+ });
+ })
+ .runAfterNodeUpgrade((cluster, node) -> {
+ validateResults(helper, cluster, 1);
+ validateResults(helper, cluster, 2);
+
+ String flagQuery = String.format("SELECT flags FROM
system_schema.tables WHERE keyspace_name='%s' and table_name='%s'", KEYSPACE,
TABLE_NAME);
+ Object[][] results = cluster.get(node).executeInternal(flagQuery);
+ if (results.length != 1)
+ Assert.fail("failed to find table flags with query: " +
flagQuery);
+
+ Set<String> flags = (Set) results[0][0];
+ Assert.assertTrue("missing compound flag",
flags.contains("compound"));
+ Assert.assertFalse("found dense flag", flags.contains("dense"));
+ })
+ .run();
+ }
+
+ @Test
+ public void failOnCompactClusteredTablesWithValueOutColumn() throws
Throwable
+ {
+ try
+ {
+ new TestCase()
+ .nodes(2)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .setup(cluster -> cluster.schemaChange(CREATE_TABLE_C1_ONLY))
+ .runAfterNodeUpgrade((cluster, node) -> {
+ Assert.fail("should never run because we don't expect the node
to start");
+ })
+ .run();
+ } catch (RuntimeException e)
+ {
+ validateError(e);
+ }
+ }
+
+ @Test
+ public void failOnCompactTablesWithNoClustering() throws Throwable
+ {
+ try
+ {
+ new TestCase()
+ .nodes(2)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .setup(cluster -> cluster.schemaChange(CREATE_TABLE_R_ONLY))
+ .runAfterNodeUpgrade((cluster, node) -> {
+ Assert.fail("should never run because we don't expect the node
to start");
+ })
+ .run();
+ } catch (RuntimeException e)
+ {
+ validateError(e);
+ }
+ }
+
+
+ public void validateResults(DropCompactTestHelper helper,
UpgradeableCluster cluster, int node)
+ {
+ validateResults(helper, cluster, node, ConsistencyLevel.ALL);
+ }
+
+ public void validateResults(DropCompactTestHelper helper,
UpgradeableCluster cluster, int node, ConsistencyLevel cl)
+ {
+ for (Map.Entry<String, Object[][]> entry :
helper.queriesAndResults().entrySet())
+ {
+ Object[][] postUpgradeResult =
cluster.coordinator(node).execute(entry.getKey(), cl);
+ assertRows(postUpgradeResult, entry.getValue());
+ }
+
+ }
+
+ private void runQueries(ICoordinator coordinator, DropCompactTestHelper
helper, String[] queries)
+ {
+ for (String query : queries)
+ helper.addResult(query, coordinator.execute(query,
ConsistencyLevel.ALL));
+ }
+
+ private void validateError(Throwable t)
+ {
+ Throwable cause = t.getCause();
+ if (cause instanceof StartupException)
+ {
+ Assert.assertTrue("Message was: " + cause.getMessage(),
+ cause.getMessage().contains(String.format("ALTER
TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME)));
+ }
+
+ }
+
+ public static class DropCompactTestHelper
+ {
+ final private Map<String, Object[][]> preUpgradeResults = new
HashMap<>();
+
+ public void addResult(String query, Object[][] results)
+ {
+ preUpgradeResults.put(query, results);
+ }
+
+ public Map<String, Object[][]> queriesAndResults()
+ {
+ return preUpgradeResults;
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]