This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new e8fb4b2f63 Ensure that empty SAI column indexes do not fail on
validation after full-SSTable streaming
e8fb4b2f63 is described below
commit e8fb4b2f63b32f337447992f9eb57a12e2afc0e4
Author: Andrés de la Peña <[email protected]>
AuthorDate: Mon Nov 13 12:46:53 2023 +0000
Ensure that empty SAI column indexes do not fail on validation after
full-SSTable streaming
patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-19017
Co-authored-by: Andrés de la Peña <[email protected]>
Co-authored-by: Caleb Rackliffe <[email protected]>
---
CHANGES.txt | 1 +
.../sai/disk/v1/ColumnCompletionMarkerUtil.java | 74 +++++++++++++++++++++
.../index/sai/disk/v1/MemtableIndexWriter.java | 6 +-
.../index/sai/disk/v1/SSTableIndexWriter.java | 4 +-
.../index/sai/disk/v1/V1OnDiskFormat.java | 77 +++++++++++++---------
.../distributed/test/sai/IndexStreamingTest.java | 23 +++----
6 files changed, 140 insertions(+), 45 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a4e2fa287..d744f50fa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0-beta1
+ * Ensure that empty SAI column indexes do not fail on validation after
full-SSTable streaming (CASSANDRA-19017)
* SAI in-memory index should check max term size (CASSANDRA-18926)
* Set default disk_access_mode to mmap_index_only (CASSANDRA-19021)
* Exclude net.java.dev.jna:jna dependency from dependencies of
org.caffinitas.ohc:ohc-core (CASSANDRA-18992)
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
new file mode 100644
index 0000000000..760083ad9d
--- /dev/null
+++
b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sai.disk.v1;
+
+import java.io.IOException;
+
+import org.apache.cassandra.index.sai.IndexContext;
+import org.apache.cassandra.index.sai.disk.format.IndexComponent;
+import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
+import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Utility class for creating and reading the column completion marker, {@link
IndexComponent#COLUMN_COMPLETION_MARKER}.
+ * </p>
+ * The file has a header and a footer, as written by {@link
SAICodecUtils#writeHeader(IndexOutput)} and
+ * {@link SAICodecUtils#writeFooter(IndexOutput)}. The only content of the
file is a single byte indicating whether the
+ * column index is empty or not. If the index is empty the completion marker
will be the only per-index component.
+ */
+public class ColumnCompletionMarkerUtil
+{
+ private static final byte EMPTY = (byte) 1;
+ private static final byte NOT_EMPTY = (byte) 0;
+
+ /**
+ * Creates a column index completion marker for the specified column
index, storing in it whether the index is empty.
+ *
+ * @param descriptor the index descriptor
+ * @param context the column index context
+ * @param isEmpty whether the index is empty
+ */
+ public static void create(IndexDescriptor descriptor, IndexContext
context, boolean isEmpty) throws IOException
+ {
+ try (IndexOutputWriter output =
descriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
+ {
+ SAICodecUtils.writeHeader(output);
+ output.writeByte(isEmpty ? EMPTY : NOT_EMPTY);
+ SAICodecUtils.writeFooter(output);
+ }
+ }
+
+ /**
+ * Reads the column index completion marker and returns whether if the
index is empty.
+ *
+ * @param descriptor the index descriptor
+ * @param context the column index context
+ * @return {@code true} if the index is empty, {@code false} otherwise.
+ */
+ public static boolean isEmptyIndex(IndexDescriptor descriptor,
IndexContext context) throws IOException
+ {
+ try (IndexInput input =
descriptor.openPerIndexInput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
+ {
+ SAICodecUtils.checkHeader(input); // consume header
+ return input.readByte() == EMPTY;
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index 10d30531f5..cd1495c3f6 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -101,7 +101,8 @@ public class MemtableIndexWriter implements
PerColumnIndexWriter
logger.debug(indexContext.logMessage("No indexed rows to flush
from SSTable {}."), indexDescriptor.sstableDescriptor);
// Write a completion marker even though we haven't written
anything to the index,
// so we won't try to build the index again for the SSTable
-
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER,
indexContext);
+ ColumnCompletionMarkerUtil.create(indexDescriptor,
indexContext, true);
+
return;
}
@@ -204,7 +205,8 @@ public class MemtableIndexWriter implements
PerColumnIndexWriter
private void completeIndexFlush(long cellCount, long startTime, Stopwatch
stopwatch) throws IOException
{
-
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER,
indexContext);
+ // create a completion marker indicating that the index is complete
and not-empty
+ ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext,
false);
indexContext.getIndexMetrics().memtableIndexFlushCount.inc();
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index 6843206462..3f85c30225 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -134,7 +134,9 @@ public class SSTableIndexWriter implements
PerColumnIndexWriter
}
writeSegmentsMetadata();
-
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER,
indexContext);
+
+ // write column index completion marker, indicating whether the
index is empty
+ ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext,
segments.isEmpty());
}
finally
{
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index e4f8737de6..323d81131f 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.utils.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -188,21 +189,7 @@ public class V1OnDiskFormat implements OnDiskFormat
{
if (isNotBuildCompletionMarker(indexComponent))
{
- try (IndexInput input =
indexDescriptor.openPerSSTableInput(indexComponent))
- {
- if (checksum)
- SAICodecUtils.validateChecksum(input);
- else
- SAICodecUtils.validate(input);
- }
- catch (Exception e)
- {
- logger.warn(indexDescriptor.logMessage("{} failed for
index component {} on SSTable {}."),
- checksum ?
"Checksum validation" : "Validation",
- indexComponent,
-
indexDescriptor.sstableDescriptor);
- rethrowIOException(e);
- }
+ validateIndexComponent(indexDescriptor, null, indexComponent,
checksum);
}
}
}
@@ -210,29 +197,57 @@ public class V1OnDiskFormat implements OnDiskFormat
@Override
public void validatePerColumnIndexComponents(IndexDescriptor
indexDescriptor, IndexContext indexContext, boolean checksum)
{
+ // determine if the index is empty, which would be encoded in the
column completion marker
+ boolean isEmptyIndex = false;
+ if
(indexDescriptor.hasComponent(IndexComponent.COLUMN_COMPLETION_MARKER,
indexContext))
+ {
+ // first validate the file...
+ validateIndexComponent(indexDescriptor, indexContext,
IndexComponent.COLUMN_COMPLETION_MARKER, checksum);
+
+ // ...then read to check if the index is empty
+ try
+ {
+ isEmptyIndex =
ColumnCompletionMarkerUtil.isEmptyIndex(indexDescriptor, indexContext);
+ }
+ catch (IOException e)
+ {
+ rethrowIOException(e);
+ }
+ }
+
for (IndexComponent indexComponent :
perColumnIndexComponents(indexContext))
{
- if (isNotBuildCompletionMarker(indexComponent))
+ if (!isEmptyIndex && isNotBuildCompletionMarker(indexComponent))
{
- try (IndexInput input =
indexDescriptor.openPerIndexInput(indexComponent, indexContext))
- {
- if (checksum)
- SAICodecUtils.validateChecksum(input);
- else
- SAICodecUtils.validate(input);
- }
- catch (Exception e)
- {
- logger.warn(indexDescriptor.logMessage("{} failed for
index component {} on SSTable {}"),
- checksum ?
"Checksum validation" : "Validation",
- indexComponent,
-
indexDescriptor.sstableDescriptor);
- rethrowIOException(e);
- }
+ validateIndexComponent(indexDescriptor, indexContext,
indexComponent, checksum);
}
}
}
+ private static void validateIndexComponent(IndexDescriptor indexDescriptor,
+ IndexContext indexContext,
+ IndexComponent indexComponent,
+ boolean checksum)
+ {
+ try (IndexInput input = indexContext == null
+ ?
indexDescriptor.openPerSSTableInput(indexComponent)
+ :
indexDescriptor.openPerIndexInput(indexComponent, indexContext))
+ {
+ if (checksum)
+ SAICodecUtils.validateChecksum(input);
+ else
+ SAICodecUtils.validate(input);
+ }
+ catch (Exception e)
+ {
+ logger.warn(indexDescriptor.logMessage("{} failed for index
component {} on SSTable {}"),
+ checksum ? "Checksum validation" : "Validation",
+ indexComponent,
+ indexDescriptor.sstableDescriptor);
+ rethrowIOException(e);
+ }
+ }
+
private static void rethrowIOException(Exception e)
{
if (e instanceof IOException)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
index e637d56b87..c3f1f42073 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
@@ -97,16 +97,10 @@ public class IndexStreamingTest extends TestBaseImpl
int numSSTableComponents = isWide ?
V1OnDiskFormat.WIDE_PER_SSTABLE_COMPONENTS.size() :
V1OnDiskFormat.SKINNY_PER_SSTABLE_COMPONENTS.size();
int numIndexComponents = isLiteral ?
V1OnDiskFormat.LITERAL_COMPONENTS.size() :
V1OnDiskFormat.NUMERIC_COMPONENTS.size();
- int numComponents = sstableStreamingComponentsCount() +
numSSTableComponents + numIndexComponents;
+ int numComponents = sstableStreamingComponentsCount() +
numSSTableComponents + numIndexComponents + 1;
- if (isLiteral)
- cluster.schemaChange(withKeyspace(
- "CREATE CUSTOM INDEX ON %s.test(literal) USING
'StorageAttachedIndex';"
- ));
- else
- cluster.schemaChange(withKeyspace(
- "CREATE CUSTOM INDEX ON %s.test(numeric) USING
'StorageAttachedIndex';"
- ));
+ cluster.schemaChange(withKeyspace("CREATE INDEX ON
%s.test(literal) USING 'sai';"));
+ cluster.schemaChange(withKeyspace("CREATE INDEX ON
%s.test(numeric) USING 'sai';"));
cluster.stream().forEach(i ->
i.nodetoolResult("disableautocompaction",
KEYSPACE).asserts().success()
@@ -115,12 +109,19 @@ public class IndexStreamingTest extends TestBaseImpl
IInvokableInstance second = cluster.get(2);
long sstableCount = 10;
long expectedFiles = isZeroCopyStreaming ? sstableCount *
numComponents : sstableCount;
+
for (int i = 0; i < sstableCount; i++)
{
if (isWide)
- first.executeInternal(withKeyspace("insert into
%s.test(pk, ck, literal, numeric, b) values (?, ?, ?, ?, ?)"), i, i, "v" + i,
i, BLOB);
+ {
+ String insertTemplate = "INSERT INTO %s.test(pk, ck, " +
(isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?, ?)";
+ first.executeInternal(withKeyspace(insertTemplate), i, i,
isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
+ }
else
- first.executeInternal(withKeyspace("insert into
%s.test(pk, literal, numeric, b) values (?, ?, ?, ?)"), i, "v" + i, i, BLOB);
+ {
+ String insertTemplate = "INSERT INTO %s.test(pk, " +
(isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?)";
+ first.executeInternal(withKeyspace(insertTemplate), i,
isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
+ }
first.flush(KEYSPACE);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]