This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 91a242fe00 SAI in-memory index should check max term size 91a242fe00 is described below commit 91a242fe00c9cc740d6d80df61240eebf05f4458 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Fri Nov 17 15:28:29 2023 +0000 SAI in-memory index should check max term size patch by Zhao Yang and Andrés de la Peña; reviewed by Zhao Yang for CASSANDRA-18926 Co-authored-by: Zhao Yang <zhaoyangsingap...@gmail.com> Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com> --- CHANGES.txt | 1 + .../apache/cassandra/index/sai/IndexContext.java | 76 ++++++++++++++++++++++ .../cassandra/index/sai/StorageAttachedIndex.java | 6 +- .../index/sai/disk/v1/SSTableIndexWriter.java | 23 +------ .../index/sai/memory/TrieMemoryIndex.java | 2 + .../index/sai/memory/VectorMemoryIndex.java | 2 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 6 ++ .../index/sai/cql/StorageAttachedIndexDDLTest.java | 47 +++++++++++++ 8 files changed, 139 insertions(+), 24 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5102f80bff..9a4e2fa287 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-beta1 + * SAI in-memory index should check max term size (CASSANDRA-18926) * Set default disk_access_mode to mmap_index_only (CASSANDRA-19021) * Exclude net.java.dev.jna:jna dependency from dependencies of org.caffinitas.ohc:ohc-core (CASSANDRA-18992) * Add UCS sstable_growth and min_sstable_size options (CASSANDRA-18945) diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index 26ef903c40..b88d59b6e6 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import com.google.common.base.MoreObjects; @@ -62,14 +63,28 @@ import org.apache.cassandra.index.sai.view.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE; +import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE; +import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE; + /** * Manages metadata for each column index. */ public class IndexContext { private static final Logger logger = LoggerFactory.getLogger(IndexContext.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + + public static final long MAX_STRING_TERM_SIZE = SAI_MAX_STRING_TERM_SIZE.getSizeInBytes(); + public static final long MAX_FROZEN_TERM_SIZE = SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes(); + public static final long MAX_VECTOR_TERM_SIZE = SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes(); + public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of column %s to index for key: %s, term size %s " + + "max allowed size %s, use analyzed = true (if not yet set) for that column."; private static final Set<AbstractType<?>> EQ_ONLY_TYPES = ImmutableSet.of(UTF8Type.instance, AsciiType.instance, @@ -98,6 +113,8 @@ public class IndexContext private final AbstractAnalyzer.AnalyzerFactory analyzerFactory; private final PrimaryKey.Factory primaryKeyFactory; + private final long maxTermSize; + public IndexContext(String keyspace, String table, AbstractType<?> partitionKeyType, @@ -128,6 +145,8 @@ public class IndexContext this.analyzerFactory = indexMetadata == null ? AbstractAnalyzer.fromOptions(getValidator(), Collections.emptyMap()) : AbstractAnalyzer.fromOptions(getValidator(), indexMetadata.options); + + maxTermSize = isVector() ? MAX_VECTOR_TERM_SIZE : (isFrozen() ? MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE); } public boolean hasClustering() @@ -509,4 +528,61 @@ public class IndexContext .mapToLong(SSTableIndex::indexFileCacheSize) .sum(); } + + /** + * Validate maximum term size for given row + */ + public void validateMaxTermSizeForRow(DecoratedKey key, Row row, boolean sendClientWarning) + { + AbstractAnalyzer analyzer = getAnalyzerFactory().create(); + if (isNonFrozenCollection()) + { + Iterator<ByteBuffer> bufferIterator = getValuesOf(row, FBUtilities.nowInSeconds()); + while (bufferIterator != null && bufferIterator.hasNext()) + validateMaxTermSizeForCell(analyzer, key, bufferIterator.next(), sendClientWarning); + } + else + { + ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds()); + validateMaxTermSizeForCell(analyzer, key, value, sendClientWarning); + } + } + + private void validateMaxTermSizeForCell(AbstractAnalyzer analyzer, DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean sendClientWarning) + { + if (cellBuffer == null || cellBuffer.remaining() == 0) + return; + + // analyzer should not return terms that are larger than the origin value. + if (cellBuffer.remaining() <= maxTermSize) + return; + + analyzer.reset(cellBuffer.duplicate()); + while (analyzer.hasNext()) + validateMaxTermSize(key, analyzer.next(), sendClientWarning); + } + + /** + * Validate maximum term size for given term + * @return true if given term is valid; otherwise false. + */ + public boolean validateMaxTermSize(DecoratedKey key, ByteBuffer term, boolean sendClientWarning) + { + if (term.remaining() > maxTermSize) + { + String message = logMessage(String.format(TERM_OVERSIZE_MESSAGE, + getColumnName(), + keyValidator().getString(key.getKey()), + FBUtilities.prettyPrintMemory(term.remaining()), + FBUtilities.prettyPrintMemory(maxTermSize))); + + if (sendClientWarning) + ClientWarn.instance.warn(message); + + noSpamLogger.warn(message); + return false; + } + + return true; + } } diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index e78e2be50c..6ada975010 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -625,7 +625,11 @@ public class StorageAttachedIndex implements Index @Override public void validate(PartitionUpdate update) throws InvalidRequestException - {} + { + DecoratedKey key = update.partitionKey(); + for (Row row : update) + indexContext.validateMaxTermSizeForRow(key, row, true); + } /** * This method is called by the startup tasks to find SSTables that don't have indexes. The method is diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index b8ffc72cae..6843206462 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -44,11 +44,6 @@ import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NoSpamLogger; - -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE; -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE; -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE; /** * Column index writer that accumulates (on-heap) indexed data from a compacted SSTable as it's being flushed to disk. @@ -57,20 +52,12 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VE public class SSTableIndexWriter implements PerColumnIndexWriter { private static final Logger logger = LoggerFactory.getLogger(SSTableIndexWriter.class); - private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); - - public static final long MAX_STRING_TERM_SIZE = SAI_MAX_STRING_TERM_SIZE.getSizeInBytes(); - public static final long MAX_FROZEN_TERM_SIZE = SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes(); - public static final long MAX_VECTOR_TERM_SIZE = SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes(); - public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of column {} to index for key: {}, term size {} " + - "max allowed size {}, use analyzed = true (if not yet set) for that column."; private final IndexDescriptor indexDescriptor; private final IndexContext indexContext; private final long nowInSec = FBUtilities.nowInSeconds(); private final AbstractAnalyzer analyzer; private final NamedMemoryLimiter limiter; - private final long maxTermSize; private final BooleanSupplier isIndexValid; private final List<SegmentMetadata> segments = new ArrayList<>(); @@ -84,7 +71,6 @@ public class SSTableIndexWriter implements PerColumnIndexWriter this.analyzer = indexContext.getAnalyzerFactory().create(); this.limiter = limiter; this.isIndexValid = isIndexValid; - this.maxTermSize = indexContext.isVector() ? MAX_VECTOR_TERM_SIZE : (indexContext.isFrozen() ? MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE); } @Override @@ -201,15 +187,8 @@ public class SSTableIndexWriter implements PerColumnIndexWriter private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, AbstractType<?> type) throws IOException { - if (term.remaining() >= maxTermSize) - { - noSpamLogger.warn(indexContext.logMessage(TERM_OVERSIZE_MESSAGE), - indexContext.getColumnName(), - indexContext.keyValidator().getString(key.partitionKey().getKey()), - FBUtilities.prettyPrintMemory(term.remaining()), - FBUtilities.prettyPrintMemory(maxTermSize)); + if (!indexContext.validateMaxTermSize(key.partitionKey(), term, false)) return; - } if (currentBuilder == null) { diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java index 25e8257f47..843076394b 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java @@ -108,6 +108,8 @@ public class TrieMemoryIndex extends MemoryIndex while (analyzer.hasNext()) { final ByteBuffer term = analyzer.next(); + if (!indexContext.validateMaxTermSize(key, term, false)) + continue; setMinMaxTerm(term.duplicate()); diff --git a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java index 3ba096b007..12b21d2205 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java @@ -78,7 +78,7 @@ public class VectorMemoryIndex extends MemoryIndex @Override public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value) { - if (value == null || value.remaining() == 0) + if (value == null || value.remaining() == 0 || !indexContext.validateMaxTermSize(key, value, false)) return 0; var primaryKey = indexContext.hasClustering() ? indexContext.keyFactory().create(key, clustering) diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index c9b43e4cd2..63f297dd51 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -2586,6 +2586,12 @@ public abstract class CQLTester { return values.length; } + + @Override + public String toString() + { + return Arrays.toString(values); + } } // Attempt to find an AbstracType from a value (for serialization/printing sake). diff --git a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java index f10149855d..d86be4ce93 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.index.sai.cql; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -39,6 +40,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.ReadFailureException; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnIdentifier; @@ -76,6 +78,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; import org.assertj.core.api.Assertions; import org.mockito.Mockito; @@ -624,6 +627,50 @@ public class StorageAttachedIndexDDLTest extends SAITester assertThatThrownBy(() -> executeNet("SELECT id1 FROM %s WHERE v1>=0")).isInstanceOf(ReadFailureException.class); } + @Test + public void testMaxTermSize() throws Throwable + { + String largeTerm = UTF8Type.instance.compose(ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT / 2 + 1)); + int maxFloatVectorDimensions = (int) (CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes() / 4); // 4 bytes per dimension + Vector<Float> largeVector = vector(new float[maxFloatVectorDimensions + 1]); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, r text, m map<text, text>, v vector<float, " + largeVector.size() + ">)"); + createIndex("CREATE CUSTOM INDEX ON %s(r) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(ENTRIES(m)) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + + // verify that a write exceeding max term size is accepted with client warnings + ResultSet resultSet = executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm); + List<String> warnings = resultSet.getExecutionInfo().getWarnings(); + warnings.sort(String::compareTo); + assertEquals(3, warnings.size()); + assertTrue(warnings.get(0).contains("Can't add term of column m")); + assertTrue(warnings.get(1).contains("Can't add term of column r")); + assertTrue(warnings.get(2).contains("Can't add term of column v")); + + // verify that the large terms aren't written into the memtable indexes + assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); + assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); + assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); + assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); + + // verify that the large terms aren't written into the sstable indexes after flush + flush(); + assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); + assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); + assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); + assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); + + // verify that the large terms aren't written into the sstable indexes after compactions + executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm); + flush(); + compact(); + assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); + assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); + assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); + assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); + } + @Test public void shouldReleaseIndexFilesAfterDroppingLastIndex() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org