This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 91a242fe00 SAI in-memory index should check max term size
91a242fe00 is described below

commit 91a242fe00c9cc740d6d80df61240eebf05f4458
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Fri Nov 17 15:28:29 2023 +0000

    SAI in-memory index should check max term size
    
    patch by Zhao Yang and Andrés de la Peña; reviewed by Zhao Yang for 
CASSANDRA-18926
    
    Co-authored-by: Zhao Yang <zhaoyangsingap...@gmail.com>
    Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com>
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/index/sai/IndexContext.java   | 76 ++++++++++++++++++++++
 .../cassandra/index/sai/StorageAttachedIndex.java  |  6 +-
 .../index/sai/disk/v1/SSTableIndexWriter.java      | 23 +------
 .../index/sai/memory/TrieMemoryIndex.java          |  2 +
 .../index/sai/memory/VectorMemoryIndex.java        |  2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  6 ++
 .../index/sai/cql/StorageAttachedIndexDDLTest.java | 47 +++++++++++++
 8 files changed, 139 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5102f80bff..9a4e2fa287 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta1
+ * SAI in-memory index should check max term size (CASSANDRA-18926)
  * Set default disk_access_mode to mmap_index_only (CASSANDRA-19021)
  * Exclude net.java.dev.jna:jna dependency from dependencies of 
org.caffinitas.ohc:ohc-core (CASSANDRA-18992)
  * Add UCS sstable_growth and min_sstable_size options (CASSANDRA-18945)
diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java 
b/src/java/org/apache/cassandra/index/sai/IndexContext.java
index 26ef903c40..b88d59b6e6 100644
--- a/src/java/org/apache/cassandra/index/sai/IndexContext.java
+++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 import com.google.common.base.MoreObjects;
@@ -62,14 +63,28 @@ import org.apache.cassandra.index.sai.view.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
 
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE;
+
 /**
  * Manages metadata for each column index.
  */
 public class IndexContext
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IndexContext.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    public static final long MAX_STRING_TERM_SIZE = 
SAI_MAX_STRING_TERM_SIZE.getSizeInBytes();
+    public static final long MAX_FROZEN_TERM_SIZE = 
SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes();
+    public static final long MAX_VECTOR_TERM_SIZE = 
SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes();
+    public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of 
column %s to index for key: %s, term size %s " +
+                                                       "max allowed size %s, 
use analyzed = true (if not yet set) for that column.";
 
     private static final Set<AbstractType<?>> EQ_ONLY_TYPES = 
ImmutableSet.of(UTF8Type.instance,
                                                                               
AsciiType.instance,
@@ -98,6 +113,8 @@ public class IndexContext
     private final AbstractAnalyzer.AnalyzerFactory analyzerFactory;
     private final PrimaryKey.Factory primaryKeyFactory;
 
+    private final long maxTermSize;
+
     public IndexContext(String keyspace,
                         String table,
                         AbstractType<?> partitionKeyType,
@@ -128,6 +145,8 @@ public class IndexContext
 
         this.analyzerFactory = indexMetadata == null ? 
AbstractAnalyzer.fromOptions(getValidator(), Collections.emptyMap())
                                                      : 
AbstractAnalyzer.fromOptions(getValidator(), indexMetadata.options);
+
+        maxTermSize = isVector() ? MAX_VECTOR_TERM_SIZE : (isFrozen() ? 
MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE);
     }
 
     public boolean hasClustering()
@@ -509,4 +528,61 @@ public class IndexContext
                         .mapToLong(SSTableIndex::indexFileCacheSize)
                         .sum();
     }
+
+    /**
+     * Validate maximum term size for given row
+     */
+    public void validateMaxTermSizeForRow(DecoratedKey key, Row row, boolean 
sendClientWarning)
+    {
+        AbstractAnalyzer analyzer = getAnalyzerFactory().create();
+        if (isNonFrozenCollection())
+        {
+            Iterator<ByteBuffer> bufferIterator = getValuesOf(row, 
FBUtilities.nowInSeconds());
+            while (bufferIterator != null && bufferIterator.hasNext())
+                validateMaxTermSizeForCell(analyzer, key, 
bufferIterator.next(), sendClientWarning);
+        }
+        else
+        {
+            ByteBuffer value = getValueOf(key, row, 
FBUtilities.nowInSeconds());
+            validateMaxTermSizeForCell(analyzer, key, value, 
sendClientWarning);
+        }
+    }
+
+    private void validateMaxTermSizeForCell(AbstractAnalyzer analyzer, 
DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean sendClientWarning)
+    {
+        if (cellBuffer == null || cellBuffer.remaining() == 0)
+            return;
+
+        // analyzer should not return terms that are larger than the origin 
value.
+        if (cellBuffer.remaining() <= maxTermSize)
+            return;
+
+        analyzer.reset(cellBuffer.duplicate());
+        while (analyzer.hasNext())
+            validateMaxTermSize(key, analyzer.next(), sendClientWarning);
+    }
+
+    /**
+     * Validate maximum term size for given term
+     * @return true if given term is valid; otherwise false.
+     */
+    public boolean validateMaxTermSize(DecoratedKey key, ByteBuffer term, 
boolean sendClientWarning)
+    {
+        if (term.remaining() > maxTermSize)
+        {
+            String message = logMessage(String.format(TERM_OVERSIZE_MESSAGE,
+                                                      getColumnName(),
+                                                      
keyValidator().getString(key.getKey()),
+                                                      
FBUtilities.prettyPrintMemory(term.remaining()),
+                                                      
FBUtilities.prettyPrintMemory(maxTermSize)));
+
+            if (sendClientWarning)
+                ClientWarn.instance.warn(message);
+
+            noSpamLogger.warn(message);
+            return false;
+        }
+
+        return true;
+    }
 }
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index e78e2be50c..6ada975010 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -625,7 +625,11 @@ public class StorageAttachedIndex implements Index
 
     @Override
     public void validate(PartitionUpdate update) throws InvalidRequestException
-    {}
+    {
+        DecoratedKey key = update.partitionKey();
+        for (Row row : update)
+            indexContext.validateMaxTermSizeForRow(key, row, true);
+    }
 
     /**
      * This method is called by the startup tasks to find SSTables that don't 
have indexes. The method is
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index b8ffc72cae..6843206462 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -44,11 +44,6 @@ import org.apache.cassandra.index.sai.utils.PrimaryKey;
 import org.apache.cassandra.index.sai.utils.TypeUtil;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE;
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE;
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE;
 
 /**
  * Column index writer that accumulates (on-heap) indexed data from a 
compacted SSTable as it's being flushed to disk.
@@ -57,20 +52,12 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VE
 public class SSTableIndexWriter implements PerColumnIndexWriter
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableIndexWriter.class);
-    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
-
-    public static final long MAX_STRING_TERM_SIZE = 
SAI_MAX_STRING_TERM_SIZE.getSizeInBytes();
-    public static final long MAX_FROZEN_TERM_SIZE = 
SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes();
-    public static final long MAX_VECTOR_TERM_SIZE = 
SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes();
-    public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of 
column {} to index for key: {}, term size {} " +
-                                                       "max allowed size {}, 
use analyzed = true (if not yet set) for that column.";
 
     private final IndexDescriptor indexDescriptor;
     private final IndexContext indexContext;
     private final long nowInSec = FBUtilities.nowInSeconds();
     private final AbstractAnalyzer analyzer;
     private final NamedMemoryLimiter limiter;
-    private final long maxTermSize;
     private final BooleanSupplier isIndexValid;
     private final List<SegmentMetadata> segments = new ArrayList<>();
 
@@ -84,7 +71,6 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
         this.analyzer = indexContext.getAnalyzerFactory().create();
         this.limiter = limiter;
         this.isIndexValid = isIndexValid;
-        this.maxTermSize = indexContext.isVector() ? MAX_VECTOR_TERM_SIZE : 
(indexContext.isFrozen() ? MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE);
     }
 
     @Override
@@ -201,15 +187,8 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
 
     private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, 
AbstractType<?> type) throws IOException
     {
-        if (term.remaining() >= maxTermSize)
-        {
-            noSpamLogger.warn(indexContext.logMessage(TERM_OVERSIZE_MESSAGE),
-                              indexContext.getColumnName(),
-                              
indexContext.keyValidator().getString(key.partitionKey().getKey()),
-                              FBUtilities.prettyPrintMemory(term.remaining()),
-                              FBUtilities.prettyPrintMemory(maxTermSize));
+        if (!indexContext.validateMaxTermSize(key.partitionKey(), term, false))
             return;
-        }
 
         if (currentBuilder == null)
         {
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java 
b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
index 25e8257f47..843076394b 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
@@ -108,6 +108,8 @@ public class TrieMemoryIndex extends MemoryIndex
             while (analyzer.hasNext())
             {
                 final ByteBuffer term = analyzer.next();
+                if (!indexContext.validateMaxTermSize(key, term, false))
+                    continue;
 
                 setMinMaxTerm(term.duplicate());
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java 
b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
index 3ba096b007..12b21d2205 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
@@ -78,7 +78,7 @@ public class VectorMemoryIndex extends MemoryIndex
     @Override
     public synchronized long add(DecoratedKey key, Clustering<?> clustering, 
ByteBuffer value)
     {
-        if (value == null || value.remaining() == 0)
+        if (value == null || value.remaining() == 0 || 
!indexContext.validateMaxTermSize(key, value, false))
             return 0;
 
         var primaryKey = indexContext.hasClustering() ? 
indexContext.keyFactory().create(key, clustering)
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index c9b43e4cd2..63f297dd51 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -2586,6 +2586,12 @@ public abstract class CQLTester
         {
             return values.length;
         }
+
+        @Override
+        public String toString()
+        {
+            return Arrays.toString(values);
+        }
     }
 
     // Attempt to find an AbstracType from a value (for serialization/printing 
sake).
diff --git 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
index f10149855d..d86be4ce93 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.index.sai.cql;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -39,6 +40,7 @@ import com.datastax.driver.core.ResultSet;
 import 
com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.ReadFailureException;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -76,6 +78,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 import org.assertj.core.api.Assertions;
 import org.mockito.Mockito;
@@ -624,6 +627,50 @@ public class StorageAttachedIndexDDLTest extends SAITester
         assertThatThrownBy(() -> executeNet("SELECT id1 FROM %s WHERE 
v1>=0")).isInstanceOf(ReadFailureException.class);
     }
 
+    @Test
+    public void testMaxTermSize() throws Throwable
+    {
+        String largeTerm = 
UTF8Type.instance.compose(ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT / 
2 + 1));
+        int maxFloatVectorDimensions = (int) 
(CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes() / 4); // 
4 bytes per dimension
+        Vector<Float> largeVector = vector(new float[maxFloatVectorDimensions 
+ 1]);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, r text, m 
map<text, text>, v vector<float, " + largeVector.size() + ">)");
+        createIndex("CREATE CUSTOM INDEX ON %s(r) USING 
'StorageAttachedIndex'");
+        createIndex("CREATE CUSTOM INDEX ON %s(ENTRIES(m)) USING 
'StorageAttachedIndex'");
+        createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' 
WITH OPTIONS = {'similarity_function' : 'euclidean'}");
+
+        // verify that a write exceeding max term size is accepted with client 
warnings
+        ResultSet resultSet = executeNet("INSERT INTO %s (k, r, m, v) VALUES 
(0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm);
+        List<String> warnings = resultSet.getExecutionInfo().getWarnings();
+        warnings.sort(String::compareTo);
+        assertEquals(3, warnings.size());
+        assertTrue(warnings.get(0).contains("Can't add term of column m"));
+        assertTrue(warnings.get(1).contains("Can't add term of column r"));
+        assertTrue(warnings.get(2).contains("Can't add term of column v"));
+
+        // verify that the large terms aren't written into the memtable indexes
+        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
+        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
+
+        // verify that the large terms aren't written into the sstable indexes 
after flush
+        flush();
+        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
+        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
+
+        // verify that the large terms aren't written into the sstable indexes 
after compactions
+        executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm 
+ "': ''}, " + largeVector + ')', largeTerm);
+        flush();
+        compact();
+        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
+        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
+        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
+    }
+
     @Test
     public void shouldReleaseIndexFilesAfterDroppingLastIndex() throws 
Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to