This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new e8fb4b2f63 Ensure that empty SAI column indexes do not fail on validation after full-SSTable streaming e8fb4b2f63 is described below commit e8fb4b2f63b32f337447992f9eb57a12e2afc0e4 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Mon Nov 13 12:46:53 2023 +0000 Ensure that empty SAI column indexes do not fail on validation after full-SSTable streaming patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-19017 Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com> Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com> --- CHANGES.txt | 1 + .../sai/disk/v1/ColumnCompletionMarkerUtil.java | 74 +++++++++++++++++++++ .../index/sai/disk/v1/MemtableIndexWriter.java | 6 +- .../index/sai/disk/v1/SSTableIndexWriter.java | 4 +- .../index/sai/disk/v1/V1OnDiskFormat.java | 77 +++++++++++++--------- .../distributed/test/sai/IndexStreamingTest.java | 23 +++---- 6 files changed, 140 insertions(+), 45 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9a4e2fa287..d744f50fa1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-beta1 + * Ensure that empty SAI column indexes do not fail on validation after full-SSTable streaming (CASSANDRA-19017) * SAI in-memory index should check max term size (CASSANDRA-18926) * Set default disk_access_mode to mmap_index_only (CASSANDRA-19021) * Exclude net.java.dev.jna:jna dependency from dependencies of org.caffinitas.ohc:ohc-core (CASSANDRA-18992) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java new file mode 100644 index 0000000000..760083ad9d --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk.v1; + +import java.io.IOException; + +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponent; +import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; +import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; + +/** + * Utility class for creating and reading the column completion marker, {@link IndexComponent#COLUMN_COMPLETION_MARKER}. + * </p> + * The file has a header and a footer, as written by {@link SAICodecUtils#writeHeader(IndexOutput)} and + * {@link SAICodecUtils#writeFooter(IndexOutput)}. The only content of the file is a single byte indicating whether the + * column index is empty or not. If the index is empty the completion marker will be the only per-index component. + */ +public class ColumnCompletionMarkerUtil +{ + private static final byte EMPTY = (byte) 1; + private static final byte NOT_EMPTY = (byte) 0; + + /** + * Creates a column index completion marker for the specified column index, storing in it whether the index is empty. + * + * @param descriptor the index descriptor + * @param context the column index context + * @param isEmpty whether the index is empty + */ + public static void create(IndexDescriptor descriptor, IndexContext context, boolean isEmpty) throws IOException + { + try (IndexOutputWriter output = descriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, context)) + { + SAICodecUtils.writeHeader(output); + output.writeByte(isEmpty ? EMPTY : NOT_EMPTY); + SAICodecUtils.writeFooter(output); + } + } + + /** + * Reads the column index completion marker and returns whether if the index is empty. + * + * @param descriptor the index descriptor + * @param context the column index context + * @return {@code true} if the index is empty, {@code false} otherwise. + */ + public static boolean isEmptyIndex(IndexDescriptor descriptor, IndexContext context) throws IOException + { + try (IndexInput input = descriptor.openPerIndexInput(IndexComponent.COLUMN_COMPLETION_MARKER, context)) + { + SAICodecUtils.checkHeader(input); // consume header + return input.readByte() == EMPTY; + } + } +} diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index 10d30531f5..cd1495c3f6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -101,7 +101,8 @@ public class MemtableIndexWriter implements PerColumnIndexWriter logger.debug(indexContext.logMessage("No indexed rows to flush from SSTable {}."), indexDescriptor.sstableDescriptor); // Write a completion marker even though we haven't written anything to the index, // so we won't try to build the index again for the SSTable - indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext); + ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, true); + return; } @@ -204,7 +205,8 @@ public class MemtableIndexWriter implements PerColumnIndexWriter private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwatch) throws IOException { - indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext); + // create a completion marker indicating that the index is complete and not-empty + ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, false); indexContext.getIndexMetrics().memtableIndexFlushCount.inc(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 6843206462..3f85c30225 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -134,7 +134,9 @@ public class SSTableIndexWriter implements PerColumnIndexWriter } writeSegmentsMetadata(); - indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext); + + // write column index completion marker, indicating whether the index is empty + ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, segments.isEmpty()); } finally { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index e4f8737de6..323d81131f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -24,6 +24,7 @@ import java.util.EnumSet; import java.util.Set; import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.utils.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,21 +189,7 @@ public class V1OnDiskFormat implements OnDiskFormat { if (isNotBuildCompletionMarker(indexComponent)) { - try (IndexInput input = indexDescriptor.openPerSSTableInput(indexComponent)) - { - if (checksum) - SAICodecUtils.validateChecksum(input); - else - SAICodecUtils.validate(input); - } - catch (Exception e) - { - logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}."), - checksum ? "Checksum validation" : "Validation", - indexComponent, - indexDescriptor.sstableDescriptor); - rethrowIOException(e); - } + validateIndexComponent(indexDescriptor, null, indexComponent, checksum); } } } @@ -210,29 +197,57 @@ public class V1OnDiskFormat implements OnDiskFormat @Override public void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum) { + // determine if the index is empty, which would be encoded in the column completion marker + boolean isEmptyIndex = false; + if (indexDescriptor.hasComponent(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext)) + { + // first validate the file... + validateIndexComponent(indexDescriptor, indexContext, IndexComponent.COLUMN_COMPLETION_MARKER, checksum); + + // ...then read to check if the index is empty + try + { + isEmptyIndex = ColumnCompletionMarkerUtil.isEmptyIndex(indexDescriptor, indexContext); + } + catch (IOException e) + { + rethrowIOException(e); + } + } + for (IndexComponent indexComponent : perColumnIndexComponents(indexContext)) { - if (isNotBuildCompletionMarker(indexComponent)) + if (!isEmptyIndex && isNotBuildCompletionMarker(indexComponent)) { - try (IndexInput input = indexDescriptor.openPerIndexInput(indexComponent, indexContext)) - { - if (checksum) - SAICodecUtils.validateChecksum(input); - else - SAICodecUtils.validate(input); - } - catch (Exception e) - { - logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"), - checksum ? "Checksum validation" : "Validation", - indexComponent, - indexDescriptor.sstableDescriptor); - rethrowIOException(e); - } + validateIndexComponent(indexDescriptor, indexContext, indexComponent, checksum); } } } + private static void validateIndexComponent(IndexDescriptor indexDescriptor, + IndexContext indexContext, + IndexComponent indexComponent, + boolean checksum) + { + try (IndexInput input = indexContext == null + ? indexDescriptor.openPerSSTableInput(indexComponent) + : indexDescriptor.openPerIndexInput(indexComponent, indexContext)) + { + if (checksum) + SAICodecUtils.validateChecksum(input); + else + SAICodecUtils.validate(input); + } + catch (Exception e) + { + logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"), + checksum ? "Checksum validation" : "Validation", + indexComponent, + indexDescriptor.sstableDescriptor); + rethrowIOException(e); + } + } + private static void rethrowIOException(Exception e) { if (e instanceof IOException) diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java index e637d56b87..c3f1f42073 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java @@ -97,16 +97,10 @@ public class IndexStreamingTest extends TestBaseImpl int numSSTableComponents = isWide ? V1OnDiskFormat.WIDE_PER_SSTABLE_COMPONENTS.size() : V1OnDiskFormat.SKINNY_PER_SSTABLE_COMPONENTS.size(); int numIndexComponents = isLiteral ? V1OnDiskFormat.LITERAL_COMPONENTS.size() : V1OnDiskFormat.NUMERIC_COMPONENTS.size(); - int numComponents = sstableStreamingComponentsCount() + numSSTableComponents + numIndexComponents; + int numComponents = sstableStreamingComponentsCount() + numSSTableComponents + numIndexComponents + 1; - if (isLiteral) - cluster.schemaChange(withKeyspace( - "CREATE CUSTOM INDEX ON %s.test(literal) USING 'StorageAttachedIndex';" - )); - else - cluster.schemaChange(withKeyspace( - "CREATE CUSTOM INDEX ON %s.test(numeric) USING 'StorageAttachedIndex';" - )); + cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.test(literal) USING 'sai';")); + cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.test(numeric) USING 'sai';")); cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success() @@ -115,12 +109,19 @@ public class IndexStreamingTest extends TestBaseImpl IInvokableInstance second = cluster.get(2); long sstableCount = 10; long expectedFiles = isZeroCopyStreaming ? sstableCount * numComponents : sstableCount; + for (int i = 0; i < sstableCount; i++) { if (isWide) - first.executeInternal(withKeyspace("insert into %s.test(pk, ck, literal, numeric, b) values (?, ?, ?, ?, ?)"), i, i, "v" + i, i, BLOB); + { + String insertTemplate = "INSERT INTO %s.test(pk, ck, " + (isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?, ?)"; + first.executeInternal(withKeyspace(insertTemplate), i, i, isLiteral ? "v" + i : Integer.valueOf(i), BLOB); + } else - first.executeInternal(withKeyspace("insert into %s.test(pk, literal, numeric, b) values (?, ?, ?, ?)"), i, "v" + i, i, BLOB); + { + String insertTemplate = "INSERT INTO %s.test(pk, " + (isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?)"; + first.executeInternal(withKeyspace(insertTemplate), i, isLiteral ? "v" + i : Integer.valueOf(i), BLOB); + } first.flush(KEYSPACE); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org