This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d325d50ed Allow value/element indexing on frozen collections in SAI
8d325d50ed is described below

commit 8d325d50ed36d04d99b02469af2e3ceaa3384888
Author: Sunil Ramchandra Pawar <[email protected]>
AuthorDate: Thu Jan 15 11:21:07 2026 +0530

    Allow value/element indexing on frozen collections in SAI
    
    patch by Sunil Ramchandra Pawar; reviewed by Caleb Rackliffe, David 
Capwell, and Andres de la Peña for CASSANDRA-18492
---
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/cql3/Relation.java   |    7 +-
 .../cql3/restrictions/MergedRestriction.java       |   13 +
 .../cql3/restrictions/SimpleRestriction.java       |   42 +-
 .../statements/schema/CreateIndexStatement.java    |   40 +-
 .../org/apache/cassandra/db/filter/RowFilter.java  |    5 +
 src/java/org/apache/cassandra/index/Index.java     |   21 +
 .../cassandra/index/sai/StorageAttachedIndex.java  |   31 +
 .../index/sai/disk/v1/SSTableIndexWriter.java      |   13 +
 .../index/sai/memory/MemtableIndexManager.java     |   13 +
 .../cassandra/index/sai/plan/FilterTree.java       |    8 +
 .../cassandra/index/sai/utils/IndexTermType.java   |  110 +-
 .../org/apache/cassandra/schema/IndexMetadata.java |   11 +
 .../entities/SecondaryIndexOnMapEntriesTest.java   |    3 +-
 .../validation/entities/SecondaryIndexTest.java    |    4 +-
 .../operations/SelectSingleColumnRelationTest.java |    3 +-
 .../cql3/validation/operations/SelectTest.java     |   20 +-
 .../db/AbstractReadQueryToCQLStringTest.java       |    3 +-
 .../index/sai/cql/CollectionIndexingTest.java      | 1117 +++++++++++++++++++-
 19 files changed, 1421 insertions(+), 44 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8c90013fcc..ec7d803a9f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Allow value/element indexing on frozen collections in SAI (CASSANDRA-18492)
  * Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
  * Send client warnings when writing to a large partition (CASSANDRA-17258)
  * Harden the possible range of values for max dictionary size and max total 
sample size for dictionary training (CASSANDRA-21194)
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java 
b/src/java/org/apache/cassandra/cql3/Relation.java
index 804b6e79e8..5de031102c 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -44,6 +44,8 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
  */
 public final class Relation
 {
+    public static final String FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED = 
"Map-entry predicates on frozen map column %s are not supported";
+
     /**
      * The raw columns'expression.
      */
@@ -204,7 +206,10 @@ public final class Relation
             AbstractType<?> baseType = column.type.unwrap();
             checkFalse(baseType instanceof ListType, "Indexes on list entries 
(%s[index] = value) are not supported.", column.name);
             checkTrue(baseType instanceof MapType, "Column %s cannot be used 
as a map", column.name);
-            checkTrue(baseType.isMultiCell(), "Map-entry predicates on frozen 
map column %s are not supported", column.name);
+
+            if (column.isClusteringColumn() && baseType.isCollection() && 
!column.type.isMultiCell())
+                throw 
invalidRequest(FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED, column.name);
+
             columnsExpression.collectMarkerSpecification(boundNames);
         }
 
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/MergedRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/MergedRestriction.java
index bb352cb076..447a1ac6dc 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/MergedRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MergedRestriction.java
@@ -27,6 +27,7 @@ import com.google.common.collect.RangeSet;
 
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.IndexHints;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -126,9 +127,21 @@ public final class MergedRestriction implements 
SingleRestriction
         checkOperator(other);
 
         if (restriction.isContains() != other.isContains())
+        {
+            SimpleRestriction mapEntryRestriction = restriction.isContains() ? 
restriction : other;
+            if (mapEntryRestriction.isMapElementExpression())
+            {
+                ColumnMetadata column = mapEntryRestriction.firstColumn();
+                if (column.type.isFrozenCollection())
+                {
+                    throw 
invalidRequest(Relation.FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED, column.name);
+                }
+            }
+
             throw invalidRequest("Collection column %s can only be restricted 
by CONTAINS, CONTAINS KEY, NOT_CONTAINS, NOT_CONTAINS_KEY" +
                                  " or map-entry equality if it already 
restricted by one of those",
                                  restriction.firstColumn().name);
+        }
 
         if (restriction.isSlice() && other.isSlice())
         {
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
index c5e05560a2..e8db9daa54 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
@@ -30,6 +30,7 @@ import com.google.common.collect.RangeSet;
 import org.apache.cassandra.cql3.ColumnsExpression;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.terms.Term;
 import org.apache.cassandra.cql3.terms.Terms;
@@ -157,6 +158,14 @@ public final class SimpleRestriction implements 
SingleRestriction
                 || columnsExpression.isMapElementExpression();
     }
 
+    /**
+     * Checks if this restriction is a map element expression (e.g., 
map['key'] = value).
+     */
+    public boolean isMapElementExpression()
+    {
+        return columnsExpression.isMapElementExpression();
+    }
+
     @Override
     public boolean needsFilteringOrIndexing()
     {
@@ -218,6 +227,18 @@ public final class SimpleRestriction implements 
SingleRestriction
         if (isOnToken())
             return false;
 
+        // For map element expressions, check if the index explicitly supports 
them.
+        if (columnsExpression.isMapElementExpression())
+        {
+            // If the index directly supports map element expressions, return 
true
+            if (index.supportsMapElementExpression())
+                return true;
+
+            // Supports post-filtering only and require ALLOW FILTERING
+            if (index.supportsFilteringOnMapElementExpression())
+                return false;
+        }
+
         for (ColumnMetadata column : columns())
         {
             if (index.supportsExpression(column, operator))
@@ -415,13 +436,28 @@ public final class SimpleRestriction implements 
SingleRestriction
                 // TODO only map elements supported for now
                 if (columnsExpression.isMapElementExpression())
                 {
+                    // For frozen maps, check if any index on the column can 
support map entry predicates
+                    // either directly or via filtering. If not, throw an 
error.
+                    if (column.type.isFrozenCollection())
+                    {
+                        for (Index index : indexRegistry.listIndexes())
+                        {
+                            if (index.dependsOn(column)
+                                && !index.supportsMapElementExpression()
+                                && 
!index.supportsFilteringOnMapElementExpression())
+                            {
+                                throw 
invalidRequest(Relation.FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED, column.name);
+                            }
+                        }
+                    }
+
                     ByteBuffer key = columnsExpression.element(options);
                     if (key == null)
-                        throw invalidRequest("Invalid null map key for column 
%s", firstColumn().name.toCQLString());
+                        throw invalidRequest("Invalid null map key for column 
%s", column.name.toCQLString());
                     if (key == ByteBufferUtil.UNSET_BYTE_BUFFER)
-                        throw invalidRequest("Invalid unset map key for column 
%s", firstColumn().name.toCQLString());
+                        throw invalidRequest("Invalid unset map key for column 
%s", column.name.toCQLString());
                     List<ByteBuffer> values = bindAndGet(options);
-                    filter.addMapEquality(firstColumn(), key, operator, 
values.get(0));
+                    filter.addMapEquality(column, key, operator, 
values.get(0));
                 }
                 break;
             default: throw new UnsupportedOperationException();
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index 332857a943..d3e08dfee5 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -81,10 +81,13 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
     public static final String ONLY_PARTITION_KEY = "Cannot create secondary 
index on the only partition key column %s";
     public static final String CREATE_ON_FROZEN_COLUMN = "Cannot create %s() 
index on frozen column %s. Frozen collections are immutable and must be fully " 
+
                                                          "indexed by using the 
'full(%s)' modifier";
-    public static final String FULL_ON_FROZEN_COLLECTIONS = "full() indexes 
can only be created on frozen collections";
+    public static final String FULL_ON_FROZEN_COLLECTIONS = "full() non-SAI 
indexes can only be created on frozen collections";
     public static final String NON_COLLECTION_SIMPLE_INDEX = "Cannot create 
%s() index on %s. Non-collection columns only support simple indexes";
     public static final String CREATE_WITH_NON_MAP_TYPE = "Cannot create index 
on %s of column %s with non-map type";
     public static final String CREATE_ON_NON_FROZEN_UDT = "Cannot create index 
on non-frozen UDT column %s";
+    public static final String 
ENTRIES_INDEX_ON_FROZEN_MAP_CLUSTERING_KEY_NOT_SUPPORTED = "Cannot create 
ENTRIES index on frozen map clustering column '%s'. " +
+                                                                               
            "Map entry predicates (column[key] = value) are not supported on 
clustering columns. " +
+                                                                               
            "Use FULL, KEYS, or VALUES index instead.";
     public static final String INDEX_ALREADY_EXISTS = "Index '%s' already 
exists";
     public static final String INDEX_DUPLICATE_OF_EXISTING = "Index %s is a 
duplicate of existing index %s";
     public static final String KEYSPACE_DOES_NOT_MATCH_TABLE = "Keyspace name 
'%s' doesn't match table name '%s'";
@@ -200,7 +203,7 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
 
         IndexMetadata.Kind kind = attrs.isCustom ? IndexMetadata.Kind.CUSTOM : 
IndexMetadata.Kind.COMPOSITES;
 
-        indexTargets.forEach(t -> validateIndexTarget(table, kind, t));
+        indexTargets.forEach(t -> validateIndexTarget(table, kind, t, attrs));
 
         String name = null == indexName ? generateIndexName(keyspace, 
indexTargets) : indexName;
 
@@ -244,7 +247,7 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
             throw ire(TOO_LONG_CUSTOM_INDEX_TARGET, name, 
SchemaConstants.NAME_LENGTH);
     }
 
-    private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind 
kind, IndexTarget target)
+    private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind 
kind, IndexTarget target, IndexAttributes attrs)
     {
         ColumnMetadata column = table.getColumn(target.column);
 
@@ -253,6 +256,8 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
 
         AbstractType<?> baseType = column.type.unwrap();
 
+        boolean isNonSAIIndex = !isSAIIndex(attrs);
+
         // TODO: this check needs to be removed with CASSANDRA-20235
         if ((kind == IndexMetadata.Kind.CUSTOM))
             validateCustomIndexColumnName(target.column.toString());
@@ -283,22 +288,41 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
         if (column.isPartitionKey() && table.partitionKeyColumns().size() == 1)
             throw ire(ONLY_PARTITION_KEY, column);
 
-        if (baseType.isFrozenCollection() && target.type != Type.FULL)
-            throw ire(CREATE_ON_FROZEN_COLUMN, target.type, column, 
column.name.toCQLString());
-
-        if (!baseType.isFrozenCollection() && target.type == Type.FULL)
+        if (target.type == Type.FULL && isNonSAIIndex && 
(!baseType.isCollection() || column.type.isMultiCell()))
             throw ire(FULL_ON_FROZEN_COLLECTIONS);
 
         if (!baseType.isCollection() && target.type != Type.SIMPLE)
             throw ire(NON_COLLECTION_SIMPLE_INDEX, target.type, column);
 
-        if (!(baseType instanceof MapType && baseType.isMultiCell()) && 
(target.type == Type.KEYS || target.type == Type.KEYS_AND_VALUES))
+        // Frozen collections are only supported with SAI indexes.
+        if (isNonSAIIndex && baseType.isCollection() && 
!column.type.isMultiCell())
+        {
+            if (target.type == Type.VALUES || target.type == Type.KEYS || 
target.type == Type.KEYS_AND_VALUES)
+            {
+                throw ire(CREATE_ON_FROZEN_COLUMN, target.type.toString(), 
column.name, column.name);
+            }
+        }
+
+        if (!(baseType instanceof MapType) && (target.type == Type.KEYS || 
target.type == Type.KEYS_AND_VALUES ))
             throw ire(CREATE_WITH_NON_MAP_TYPE, target.type, column);
 
+        // Can't query map[key]=value on clustering key columns, so ENTRIES 
index would be not queryable.
+        if (column.isClusteringColumn() && baseType instanceof MapType && 
!column.type.isMultiCell()
+            && target.type == Type.KEYS_AND_VALUES)
+            throw 
ire(ENTRIES_INDEX_ON_FROZEN_MAP_CLUSTERING_KEY_NOT_SUPPORTED, column.name);
+
         if (column.type.isUDT() && column.type.isMultiCell())
             throw ire(CREATE_ON_NON_FROZEN_UDT, column);
     }
 
+    /**
+     * Checks if the given index attributes represent a Storage Attached Index.
+     */
+    private boolean isSAIIndex(IndexAttributes attrs)
+    {
+        return attrs.isCustom && IndexMetadata.isSAIIndex(attrs.customClass);
+    }
+
     private String generateIndexName(KeyspaceMetadata keyspace, 
List<IndexTarget> targets)
     {
         String baseName = targets.size() == 1
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 53a3ce940e..ef9f1a7e2c 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -527,6 +527,11 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
             return operator;
         }
 
+        public boolean isMapElementExpression()
+        {
+            return kind() == Kind.MAP_ELEMENT;
+        }
+
         /**
          * If this expression is used to query an index, the value to use as
          * partition key for that index query.
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 61127407bb..73c4ffe480 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -437,6 +437,27 @@ public interface Index
         return true;
     }
 
+    /**
+     * Return whether this index supports map element expressions on frozen 
map columns.
+     *
+     * @return {@code true} if this index supports map element else {@code 
false}.
+     */
+    default boolean supportsMapElementExpression()
+    {
+        return false;
+    }
+
+    /**
+     * Returns whether index allows filtering for map element expressions on 
frozen collections.
+     * SAI can handle map element predicates via post-filtering.
+     *
+     * @return {@code true} if map element expressions can be evaluated via 
filtering, {@code false} otherwise.
+     */
+    default boolean supportsFilteringOnMapElementExpression()
+    {
+        return false;
+    }
+
     /**
      * If the index supports custom search expressions using the
      * {@code}SELECT * FROM table WHERE expr(index_name, expression){@code} 
syntax, this
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index 69217a6825..1dba97d3ea 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -453,12 +453,37 @@ public class StorageAttachedIndex implements Index
         return dependsOn(column) && indexTermType.supports(operator);
     }
 
+    @Override
+    public boolean supportsExpression(RowFilter.Expression expression)
+    {
+        if (expression.isMapElementExpression() &&
+            indexTermType.isFrozenCollection() &&
+            indexTermType.indexTargetType() == IndexTarget.Type.FULL)
+
+            return false;
+
+        return supportsExpression(expression.column(), expression.operator());
+    }
+
     @Override
     public boolean filtersMultipleContains()
     {
         return false;
     }
 
+    @Override
+    public boolean supportsMapElementExpression()
+    {
+        return termType().indexTargetType() == 
IndexTarget.Type.KEYS_AND_VALUES;
+    }
+
+    @Override
+    public boolean supportsFilteringOnMapElementExpression()
+    {
+        // SAI supports map element expressions via post-filtering on frozen 
collections
+        return true;
+    }
+
     @Override
     public AbstractType<?> customExpressionValueType()
     {
@@ -784,6 +809,12 @@ public class StorageAttachedIndex implements Index
             while (bufferIterator != null && bufferIterator.hasNext())
                 validateTermSizeForCell(analyzer, key, bufferIterator.next(), 
isClientMutation, state);
         }
+        else if (indexTermType.isFrozenCollection() && 
indexTermType.indexTargetType() != IndexTarget.Type.FULL)
+        {
+            Iterator<ByteBuffer> bufferIterator = 
indexTermType.valuesOfFrozenCollection(row, FBUtilities.nowInSeconds());
+            while (bufferIterator != null && bufferIterator.hasNext())
+                validateTermSizeForCell(analyzer, key, bufferIterator.next(), 
isClientMutation, state);
+        }
         else
         {
             ByteBuffer value = indexTermType.valueOf(key, row, 
FBUtilities.nowInSeconds());
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index 65480f0985..6f995730d1 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -32,6 +32,7 @@ import com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.index.sai.analyzer.AbstractAnalyzer;
@@ -94,6 +95,18 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
                 }
             }
         }
+        else if (index.termType().isFrozenCollection() && 
index.termType().indexTargetType() != IndexTarget.Type.FULL)
+        {
+            Iterator<ByteBuffer> valueIterator = 
index.termType().valuesOfFrozenCollection(row, nowInSec);
+            if (valueIterator != null)
+            {
+                while (valueIterator.hasNext())
+                {
+                    ByteBuffer value = valueIterator.next();
+                    addTerm(index.termType().asIndexBytes(value.duplicate()), 
key, sstableRowId);
+                }
+            }
+        }
         else
         {
             ByteBuffer value = index.termType().valueOf(key.partitionKey(), 
row, nowInSec);
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java 
b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
index 99bff5a1e1..686eb100b1 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
@@ -76,6 +77,18 @@ public class MemtableIndexManager
                 }
             }
         }
+        else if (index.termType().isFrozenCollection() && 
index.termType().indexTargetType() != IndexTarget.Type.FULL)
+        {
+            Iterator<ByteBuffer> bufferIterator = 
index.termType().valuesOfFrozenCollection(row, FBUtilities.nowInSeconds());
+            if (bufferIterator != null)
+            {
+                while (bufferIterator.hasNext())
+                {
+                    ByteBuffer value = bufferIterator.next();
+                    bytes += target.index(key, row.clustering(), value);
+                }
+            }
+        }
         else
         {
             ByteBuffer value = index.termType().valueOf(key, row, 
FBUtilities.nowInSeconds());
diff --git a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java 
b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
index 15ea273145..03ce9daf3b 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sai.QueryContext;
@@ -161,6 +162,12 @@ public class FilterTree
             Iterator<ByteBuffer> valueIterator = 
expression.getIndexTermType().valuesOf(row, now);
             return operator.apply(result, collectionMatch(valueIterator, 
expression));
         }
+        else if (expression.getIndexTermType().isFrozenCollection() && 
expression.getIndexTermType().indexTargetType() != IndexTarget.Type.FULL)
+        {
+            Iterator<ByteBuffer> valueIterator = 
expression.getIndexTermType().valuesOfFrozenCollection(row, now);
+            boolean matchResult = collectionMatch(valueIterator, expression);
+            return operator.apply(result, matchResult);
+        }
         else
         {
             ByteBuffer value = expression.getIndexTermType().valueOf(key, row, 
now);
@@ -181,6 +188,7 @@ public class FilterTree
         while (valueIterator.hasNext())
         {
             ByteBuffer value = valueIterator.next();
+
             if (value == null)
                 continue;
 
diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java 
b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
index 05abe93c38..e5c75a53a2 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
@@ -34,6 +34,8 @@ import java.util.Set;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
 import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
@@ -309,6 +311,11 @@ public class IndexTermType
         return columnMetadata.name.toString();
     }
 
+    public IndexTarget.Type indexTargetType()
+    {
+        return indexTargetType;
+    }
+
     public AbstractType<?> vectorElementType()
     {
         assert isVector();
@@ -453,10 +460,80 @@ public class IndexTermType
         }
     }
 
+    @Nullable
+    public Iterator<ByteBuffer> valuesOfFrozenCollection(Row row, long 
nowInSecs)
+    {
+        if (row == null)
+            return null;
+
+        ByteBuffer buffer;
+
+        if (columnMetadata.kind == ColumnMetadata.Kind.CLUSTERING)
+             buffer = row.clustering().bufferAt(columnMetadata.position());
+        else
+        {
+            Cell<?> cell = row.getCell(columnMetadata);
+            if (cell == null || !cell.isLive(nowInSecs))
+                return null;
+            buffer = cell.buffer();
+        }
+
+        if (buffer == null || buffer.remaining() == 0)
+            return null;
+
+        CollectionType<?> collectionType = (CollectionType<?>) 
columnMetadata.type.unwrap();
+        List<ByteBuffer> elements = collectionType.unpack(buffer);
+
+        switch (collectionType.kind)
+        {
+            case LIST:
+            case SET:
+                break;
+            case MAP:
+                elements = extractMapElements(elements);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported type of 
collection - " + collectionType.kind);
+        }
+
+        if (isInetAddress())
+            elements.sort((c1, c2) -> compareInet(encodeInetAddress(c1), 
encodeInetAddress(c2)));
+
+        return elements.iterator();
+    }
+
+    private List<ByteBuffer> extractMapElements(List<ByteBuffer> elements)
+    {
+        List<ByteBuffer> result = new ArrayList<>(elements.size());
+
+        for (int i = 0; i < elements.size(); i += 2)
+        {
+            ByteBuffer key = elements.get(i);
+            ByteBuffer value = i + 1 < elements.size() ? elements.get(i + 1) : 
null;
+
+            switch (indexTargetType)
+            {
+                case KEYS:
+                    result.add(key);
+                    break;
+                case VALUES:
+                    if (value != null)
+                        result.add(value);
+                    break;
+                case KEYS_AND_VALUES:
+                    if (value != null)
+                        
result.add(CompositeType.build(ByteBufferAccessor.instance, key, value));
+                    break;
+            }
+        }
+
+        return result;
+    }
+
     public Comparator<ByteBuffer> comparator()
     {
-        // Override the comparator for BigInteger, frozen collections and 
composite types
-        if (isBigInteger() || isBigDecimal() || isComposite() || isFrozen())
+        // Override the comparator for BigInteger, BigDecimal and composite 
types
+        if (isBigInteger() || isBigDecimal() || isComposite())
             return FastByteOperations::compareUnsigned;
 
         return indexType;
@@ -474,9 +551,9 @@ public class IndexTermType
             return compareInet(b1, b2);
         else if (isLong())
             return indexType.unwrap().compare(b1, b2);
-        // BigInteger values, frozen types and composite types (map entries) 
use compareUnsigned to maintain
+        // BigInteger, BigDecimal and composite types (map entries) use 
compareUnsigned to maintain
         // a consistent order between the in-memory index and the on-disk 
index.
-        else if (isBigInteger() || isBigDecimal() || isComposite() || 
isFrozen())
+        else if (isBigInteger() || isBigDecimal() || isComposite())
             return FastByteOperations.compareUnsigned(b1, b2);
 
         return indexType.compare(b1, b2);
@@ -612,6 +689,23 @@ public class IndexTermType
             return indexTargetType == IndexTarget.Type.KEYS_AND_VALUES && 
indexOperator == Expression.IndexOperator.EQ;
         }
 
+        if (isFrozenCollection())
+        {
+            if (indexTargetType == IndexTarget.Type.VALUES)
+                return indexOperator == 
Expression.IndexOperator.CONTAINS_VALUE;
+
+            if (indexTargetType == IndexTarget.Type.KEYS)
+                return indexOperator == Expression.IndexOperator.CONTAINS_KEY;
+
+            if (indexTargetType == IndexTarget.Type.KEYS_AND_VALUES)
+                return indexOperator == Expression.IndexOperator.EQ;
+
+            if (indexTargetType == IndexTarget.Type.FULL)
+                return indexOperator == Expression.IndexOperator.EQ;
+
+            return false;
+        }
+
         if (indexTargetType == IndexTarget.Type.FULL)
             return indexOperator == Expression.IndexOperator.EQ;
 
@@ -725,7 +819,9 @@ public class IndexTermType
 
     private AbstractType<?> calculateIndexType(AbstractType<?> baseType, 
EnumSet<Capability> capabilities, IndexTarget.Type indexTargetType)
     {
-        return capabilities.contains(Capability.NON_FROZEN_COLLECTION) ? 
collectionCellValueType(baseType, indexTargetType) : baseType;
+        if (IndexTarget.Type.FULL == indexTargetType)
+            return baseType;
+        return capabilities.contains(Capability.COLLECTION) ? 
collectionCellValueType(baseType, indexTargetType) : baseType;
     }
 
     private Iterator<ByteBuffer> collectionIterator(ComplexColumnData 
cellData, long nowInSecs)
@@ -747,7 +843,7 @@ public class IndexTermType
     {
         if (isNonFrozenCollection())
         {
-            switch (((CollectionType<?>) columnMetadata.type).kind)
+            switch (((CollectionType<?>) columnMetadata.type.unwrap()).kind)
             {
                 case LIST:
                     return cell.buffer();
@@ -770,7 +866,7 @@ public class IndexTermType
 
     private AbstractType<?> collectionCellValueType(AbstractType<?> type, 
IndexTarget.Type indexType)
     {
-        CollectionType<?> collection = ((CollectionType<?>) type);
+        CollectionType<?> collection = ((CollectionType<?>) type.unwrap());
         switch (collection.kind)
         {
             case LIST:
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java 
b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 585f51e7df..15a0670e3a 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -225,6 +225,17 @@ public final class IndexMetadata
         return kind == Kind.COMPOSITES;
     }
 
+    /**
+     * Checks if the given custom index class name represents a SAI.
+     */
+    public static boolean isSAIIndex(String customClass)
+    {
+        if (customClass == null)
+            return false;
+        String resolved = 
indexNameAliases.getOrDefault(toLowerCaseLocalized(customClass), customClass);
+        return StorageAttachedIndex.class.getName().equals(resolved);
+    }
+
     @Override
     public int hashCode()
     {
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
index 0ef0bbc4d6..7e6aef938b 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -242,7 +243,7 @@ public class SecondaryIndexOnMapEntriesTest extends 
CQLTester
         }
         catch (InvalidRequestException e)
         {
-            String expectedMessage = "Map-entry predicates on frozen map 
column v are not supported";
+            String expectedMessage = 
String.format(Relation.FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED, "v");
             assertTrue("Expected error message to contain '" + expectedMessage 
+ "' but got '" +
                        e.getMessage() + "'", 
e.getMessage().contains(expectedMessage));
         }
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index ca5049ef55..6ed67520f8 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -1560,7 +1560,7 @@ public class SecondaryIndexTest extends CQLTester
         execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, set(udt1));
         assertInvalidMessage("Cannot create index on keys of column v with 
non-map type",
                              "CREATE INDEX ON %s (keys(v))");
-        assertInvalidMessage("full() indexes can only be created on frozen 
collections",
+        assertInvalidMessage("full() non-SAI indexes can only be created on 
frozen collections",
                              "CREATE INDEX ON %s (full(v))");
         String indexName = createIndex("CREATE INDEX ON %s (values(v))");
 
@@ -1589,7 +1589,7 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalidMessage("Cannot create index on non-frozen UDT column v", 
"CREATE INDEX ON %s (v)");
         assertInvalidMessage("Cannot create keys() index on v. Non-collection 
columns only support simple indexes", "CREATE INDEX ON %s (keys(v))");
         assertInvalidMessage("Cannot create values() index on v. 
Non-collection columns only support simple indexes", "CREATE INDEX ON %s 
(values(v))");
-        assertInvalidMessage("full() indexes can only be created on frozen 
collections", "CREATE INDEX ON %s (full(v))");
+        assertInvalidMessage("full() non-SAI indexes can only be created on 
frozen collections", "CREATE INDEX ON %s (full(v))");
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectSingleColumnRelationTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectSingleColumnRelationTest.java
index 1ebeec495e..187cba1d82 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectSingleColumnRelationTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectSingleColumnRelationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 
 public class SelectSingleColumnRelationTest extends CQLTester
@@ -33,7 +34,7 @@ public class SelectSingleColumnRelationTest extends CQLTester
     public void textInvalidMapEntryPredicate()  throws Throwable
     {
         createTable("CREATE TABLE %s (pk int, ck frozen<map<int, int>>, v int, 
PRIMARY KEY(pk, ck)) WITH CLUSTERING ORDER BY (ck DESC)");
-        assertInvalidMessage("Map-entry predicates on frozen map column ck are 
not supported",
+        
assertInvalidMessage(String.format(Relation.FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED,
 "ck"),
                              "SELECT * FROM %s WHERE pk=? AND ck[0] = ?", 0, 
0);
     }
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
index 4fe87a5832..c511c72d2c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -1646,11 +1647,12 @@ public class SelectTest extends CQLTester
                        row(1, 2, list(1, 6), set(2, 12), map(1, 6)),
                        row(1, 4, list(1, 2), set(2, 4), map(1, 2)));
 
-            assertInvalidMessage("Map-entry predicates on frozen map column e 
are not supported",
-                                 "SELECT * FROM %s WHERE e[1] = 6 ALLOW 
FILTERING");
+            // CASSANDRA-18492: Allow filtering works with frozen map[key]
+            assertRows(execute("SELECT * FROM %s WHERE e[1] = 6 ALLOW 
FILTERING"),
+                       row(1, 2, list(1, 6), set(2, 12), map(1, 6)));
 
-            assertInvalidMessage("Map-entry predicates on frozen map column e 
are not supported",
-                                 "SELECT * FROM %s WHERE e[1] != 6 ALLOW 
FILTERING");
+            assertRows(execute("SELECT * FROM %s WHERE e[1] != 6 ALLOW 
FILTERING"),
+                       row(1, 4, list(1, 2), set(2, 4), map(1, 2)));
 
             assertRows(execute("SELECT * FROM %s WHERE e CONTAINS KEY 1 AND e 
CONTAINS 2 ALLOW FILTERING"),
                        row(1, 4, list(1, 2), set(2, 4), map(1, 2)));
@@ -1674,9 +1676,9 @@ public class SelectTest extends CQLTester
                              "SELECT * FROM %s WHERE e CONTAINS null ALLOW 
FILTERING");
         assertInvalidMessage("Invalid null value for column e",
                              "SELECT * FROM %s WHERE e CONTAINS KEY null ALLOW 
FILTERING");
-        assertInvalidMessage("Map-entry predicates on frozen map column e are 
not supported",
+        assertInvalidMessage("Invalid null map key for column e",
                              "SELECT * FROM %s WHERE e[null] = 2 ALLOW 
FILTERING");
-        assertInvalidMessage("Map-entry predicates on frozen map column e are 
not supported",
+        assertInvalidMessage("Invalid null value for e[1]",
                              "SELECT * FROM %s WHERE e[1] = null ALLOW 
FILTERING");
 
         // Checks filtering with unset
@@ -1701,10 +1703,10 @@ public class SelectTest extends CQLTester
         assertInvalidMessage("Invalid unset value for column e",
                              "SELECT * FROM %s WHERE e CONTAINS KEY ? ALLOW 
FILTERING",
                              unset());
-        assertInvalidMessage("Map-entry predicates on frozen map column e are 
not supported",
+        assertInvalidMessage("Invalid unset map key for column e",
                              "SELECT * FROM %s WHERE e[?] = 2 ALLOW FILTERING",
                              unset());
-        assertInvalidMessage("Map-entry predicates on frozen map column e are 
not supported",
+        assertInvalidMessage("Invalid unset value for e[1]",
                              "SELECT * FROM %s WHERE e[1] = ? ALLOW FILTERING",
                              unset());
     }
@@ -3556,7 +3558,7 @@ public class SelectTest extends CQLTester
                              "NOT_CONTAINS_KEY or map-entry equality if it 
already restricted by one of those",
                              "SELECT * FROM %s WHERE fm > {'lmn' : 'f'} AND fm 
CONTAINS KEY 'lmn'");
 
-        assertInvalidMessage("Map-entry predicates on frozen map column fm are 
not supported",
+        
assertInvalidMessage(String.format(Relation.FROZEN_MAP_ENTRY_PREDICATES_NOT_SUPPORTED,
 "fm"),
                              "SELECT * FROM %s WHERE fm > {'lmn' : 'f'} AND 
fm['lmn'] = 'foo2'");
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java 
b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
index c7aac0c296..8fe1bfbc4c 100644
--- a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
+++ b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
@@ -657,7 +657,8 @@ public class AbstractReadQueryToCQLStringTest extends 
CQLTester
         test("SELECT * FROM %s WHERE u = {a: 'a', b: 1} ALLOW FILTERING");
         testInvalid("SELECT * FROM %s WHERE l['a'] = 'a' ALLOW FILTERING");
         testInvalid("SELECT * FROM %s WHERE s['a'] = 'a' ALLOW FILTERING");
-        testInvalid("SELECT * FROM %s WHERE m['a'] = 'a' ALLOW FILTERING");
+        // CASSANDRA-18492 Allow filtering works with map[key]
+        test("SELECT * FROM %s WHERE m['a'] = 'a' ALLOW FILTERING");
         testInvalid("SELECT * FROM %s WHERE u.a = 'a' ALLOW FILTERING");
         testInvalid("SELECT * FROM %s WHERE u.b = 0 ALLOW FILTERING");
         testInvalid("SELECT * FROM %s WHERE u.a = 'a' ANd u.b = 0 ALLOW 
FILTERING");
diff --git 
a/test/unit/org/apache/cassandra/index/sai/cql/CollectionIndexingTest.java 
b/test/unit/org/apache/cassandra/index/sai/cql/CollectionIndexingTest.java
index 296840616f..3c1b78533b 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/CollectionIndexingTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/CollectionIndexingTest.java
@@ -17,8 +17,15 @@
  */
 package org.apache.cassandra.index.sai.cql;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +33,7 @@ import org.junit.Test;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.index.sai.SAITester;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -71,43 +79,51 @@ public class CollectionIndexingTest extends SAITester
     }
 
     @Test
-    public void indexFrozenList()
+    public void indexFrozenList() throws Throwable
     {
         createPopulatedFrozenList(createIndexDDL("FULL(value)"));
-        assertEquals(2, execute("SELECT * FROM %s WHERE value = ?", 
Arrays.asList(1, 2, 3)).size());
+        beforeAndAfterFlush(() -> {
+            assertEquals(2, execute("SELECT * FROM %s WHERE value = ?", 
Arrays.asList(1, 2, 3)).size());
+        });
     }
 
     @Test
     public void indexFrozenMap() throws Throwable
     {
         createPopulatedFrozenMap(createIndexDDL("FULL(value)"));
-        assertEquals(1, execute("SELECT * FROM %s WHERE value = ?", new 
HashMap<Integer, String>() {{
-            put(1, "v1");
-            put(2, "v2");
-        }}).size());
-
+        beforeAndAfterFlush(() -> {
+            assertEquals(1, execute("SELECT * FROM %s WHERE value = ?", new 
HashMap<Integer, String>() {{
+                put(1, "v1");
+                put(2, "v2");
+            }}).size());
+        });
     }
 
     @Test
     public void indexFrozenMapQueryKeys() throws Throwable
     {
         createPopulatedFrozenMap(createIndexDDL("FULL(value)"));
-        assertUnsupportedIndexOperator(2, "SELECT * FROM %s WHERE value 
contains key 1");
+        beforeAndAfterFlush(() -> {
+            assertUnsupportedIndexOperator(2, "SELECT * FROM %s WHERE value 
contains key 1");
+        });
     }
 
     @Test
     public void indexFrozenMapQueryValues() throws Throwable
     {
         createPopulatedFrozenMap(createIndexDDL("FULL(value)"));
-        assertUnsupportedIndexOperator(2, "SELECT * FROM %s WHERE value 
contains 'v1'");
+        beforeAndAfterFlush(() -> {
+            assertUnsupportedIndexOperator(2, "SELECT * FROM %s WHERE value 
contains 'v1'");
+        });
     }
 
     @Test
     public void indexFrozenMapQueryEntries() throws Throwable
     {
         createPopulatedFrozenMap(createIndexDDL("FULL(value)"));
-        assertInvalidMessage("Map-entry predicates on frozen map column value 
are not supported",
-                "SELECT * FROM %s WHERE value[1] = 'v1'");
+        beforeAndAfterFlush(() -> {
+            assertUnsupportedIndexOperator(2, "SELECT * FROM %s WHERE value[1] 
= 'v1'");
+        });
     }
 
     @Test
@@ -204,6 +220,1085 @@ public class CollectionIndexingTest extends SAITester
         assertRows(execute("SELECT k, v, m FROM %s WHERE v = 1 AND m CONTAINS 
KEY 2 AND m CONTAINS 3"), row);
     }
 
+    @Test
+    public void testFrozenListFullIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<int>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, [1, 2, 3])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (2, [1, 2, 3])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (3, [4, 5, 6])");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list = ? ALLOW 
FILTERING", Arrays.asList(1, 2, 3)).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_list = 
?", Arrays.asList(1, 2, 3));
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<int>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, [1, 2, 3])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (2, [3, 4, 5])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (3, [4, 5, 6])");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list CONTAINS 
3 ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_list)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_list 
CONTAINS 3");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenSetFullIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_set 
frozen<set<text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (1, {'apple', 
'banana'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (2, {'apple', 
'banana'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (3, {'cherry', 
'date'})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_set = ? ALLOW 
FILTERING", Arrays.asList("apple", "banana")).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_set)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_set = 
?", Arrays.asList("apple", "banana"));
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenSetValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_set 
frozen<set<text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (1, {'apple', 
'banana'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (2, {'banana', 
'cherry'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (3, {'cherry', 
'date'})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_set CONTAINS 
'banana' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_set 
CONTAINS 'banana'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapFullIndexEquality() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, {'k3': 'v3', 'k4': 
'v4'})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_map = ? ALLOW 
FILTERING", ImmutableMap.of("k1", "v1", "k2", "v2")).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_map)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map = 
?", ImmutableMap.of("k1", "v1", "k2", "v2"));
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapFullIndexMapEntryRequiresFiltering() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(1, "v1", 3, "v3"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, ?)", 
ImmutableMap.of(3, "v3", 4, "v4"));
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_map[1] = 'v1' 
ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_map)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_map[1] = 
'v1'");
+
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1' ALLOW FILTERING");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapEntriesIndexMapEntry() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(1, "v1", 3, "v3"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, ?)", 
ImmutableMap.of(3, "v3", 4, "v4"));
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_map[1] = 'v1' 
ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, {'k3': 'v1', 'k4': 
'v3'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, {'k5': 'v3', 'k6': 
'v4'})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
'v1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_map)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map 
CONTAINS 'v1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapKeysIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, {'k1': 'v3', 'k4': 
'v4'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, {'k5': 'v5', 'k6': 
'v6'})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
KEY 'k1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(KEYS(frozen_map)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map 
CONTAINS KEY 'k1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testDifferentIndexTypesOnDifferentColumns() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "map_full frozen<map<text, text>>, " +
+                    "map_values frozen<map<text, text>>, " +
+                    "map_keys frozen<map<text, text>>, " +
+                    "map_entries frozen<map<text, text>>)");
+
+
+        execute("INSERT INTO %s (pk, map_full, map_values, map_keys, 
map_entries) " +
+                "VALUES (1, {'k1': 'v1', 'k2': 'v2'}, {'k1': 'v1', 'k2': 
'v2'}, {'k1': 'v1', 'k2': 'v2'}, {'k1': 'v1', 'k2': 'v2'})");
+        execute("INSERT INTO %s (pk, map_full, map_values, map_keys, 
map_entries) " +
+                "VALUES (2, {'k1': 'v1', 'k2': 'v2'}, {'k3': 'v1', 'k4': 
'v3'}, {'k1': 'v3', 'k5': 'v5'}, {'k1': 'v1', 'k6': 'v6'})");
+        execute("INSERT INTO %s (pk, map_full, map_values, map_keys, 
map_entries) " +
+                "VALUES (3, {'k3': 'v3', 'k4': 'v4'}, {'k5': 'v5', 'k6': 
'v6'}, {'k7': 'v7', 'k8': 'v8'}, {'k9': 'v9', 'k10': 'v10'})");
+
+        assertEquals(2,execute("SELECT pk FROM %s WHERE map_full = ? ALLOW 
FILTERING", ImmutableMap.of("k1", "v1", "k2", "v2")).size());
+        assertEquals(2,execute("SELECT pk FROM %s WHERE map_values CONTAINS 
'v1' ALLOW FILTERING").size());
+        assertEquals(2,execute("SELECT pk FROM %s WHERE map_keys CONTAINS KEY 
'k1' ALLOW FILTERING").size());
+        assertEquals(2,execute("SELECT pk FROM %s WHERE map_entries['k1'] = 
'v1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX idx_full ON %s(FULL(map_full)) USING 'sai'");
+        createIndex("CREATE INDEX idx_values ON %s(VALUES(map_values)) USING 
'sai'");
+        createIndex("CREATE INDEX idx_keys ON %s(KEYS(map_keys)) USING 'sai'");
+        createIndex("CREATE INDEX idx_entries ON %s(ENTRIES(map_entries)) 
USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE map_full = 
?", ImmutableMap.of("k1", "v1", "k2", "v2"));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE map_values CONTAINS 
'v1'");
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE map_keys CONTAINS KEY 
'k1'");
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE map_entries['k1'] = 
'v1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFullIndexDoesNotSupportContainsOperations() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<int>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, [1, 2, 3])");
+
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_list CONTAINS 
1 ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_list CONTAINS 
1");
+        });
+    }
+
+    @Test
+    public void testValuesIndexDoesNotSupportEquality() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<int>>)");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_list)) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, [1, 2, 3])");
+
+        beforeAndAfterFlush(() -> {
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_list = ?", 
Arrays.asList(1, 2, 3));
+        });
+    }
+
+    @Test
+    public void testMapEntryWithAllowFilteringDifferentKeyTypes() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1' ALLOW FILTERING");
+            assertEquals(1, rows.all().size());
+        });
+
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_map['k1'] = 'v1' ALLOW FILTERING");
+            assertEquals(1, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapValuesIndexWithMapEntryQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_map)) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(1, "v1", 3, "v3"));
+
+        beforeAndAfterFlush(() -> {
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_map[1] = 
'v1'");
+
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1' ALLOW FILTERING");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapKeysIndexWithMapEntryQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+        createIndex("CREATE INDEX ON %s(KEYS(frozen_map)) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(1, "v1", 3, "v3"));
+
+        beforeAndAfterFlush(() -> {
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_map[1] = 
'v1'");
+
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1' ALLOW FILTERING");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapDefaultIndexType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+        createIndex("CREATE INDEX ON %s(frozen_map) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(1, "v1", 3, "v3"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, ?)", 
ImmutableMap.of(4, "v4", 5, "v5"));
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map 
CONTAINS 'v1'");
+            assertEquals(2, rows.all().size());
+
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_map = ?", 
ImmutableMap.of(1, "v1", 2, "v2"));
+
+            
assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
+                                 "SELECT pk FROM %s WHERE frozen_map[1] = 
'v1'");
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] = 'v1' 
ALLOW FILTERING");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testMapEntryQueryWithNullAndEmptyCollections() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<int, text>>)");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map)) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, ?)", 
ImmutableMap.of(1, "v1", 2, "v2"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, ?)", 
ImmutableMap.of(3, "v3", 4, "v4"));
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, NULL)");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (4, ?)", 
ImmutableMap.of());
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (5, ?)", 
ImmutableMap.of(1, "different_value"));
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map[1] 
= 'v1'");
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s");
+            assertEquals(5, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFullIndexWithNullAndEmptyCollections() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<int>>)");
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list)) USING 'sai'");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, ?)", 
Arrays.asList(1, 2, 3));
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (2, ?)", 
Arrays.asList(1, 2, 3));
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (3, ?)", 
Arrays.asList(4, 5, 6));
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (4, NULL)");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (5, ?)", List.of());
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_list = 
?", Arrays.asList(1, 2, 3));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_list = ?", 
List.of());
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s");
+            assertEquals(5, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenCollectionsWithNumericTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "frozen_list_bigint frozen<list<bigint>>, " +
+                    "frozen_set_smallint frozen<set<smallint>>, " +
+                    "frozen_map_tinyint frozen<map<tinyint, text>>, " +
+                    "frozen_list_varint frozen<list<varint>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list_bigint, frozen_set_smallint, 
frozen_map_tinyint, frozen_list_varint) " +
+                "VALUES (1, [1, 2, 3], {10, 20}, {1: 'v1', 2: 'v2'}, [100, 
200])");
+        execute("INSERT INTO %s (pk, frozen_list_bigint, frozen_set_smallint, 
frozen_map_tinyint, frozen_list_varint) " +
+                "VALUES (2, [1, 2, 3], {30, 40}, {3: 'v3'}, [300])");
+        execute("INSERT INTO %s (pk, frozen_list_bigint, frozen_set_smallint, 
frozen_map_tinyint, frozen_list_varint) " +
+                "VALUES (3, [], {}, {}, [])");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_bigint = 
? ALLOW FILTERING", Arrays.asList(1L, 2L, 3L)).size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set_smallint 
CONTAINS ? ALLOW FILTERING", (short)10).size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_map_tinyint[?] 
= 'v1' ALLOW FILTERING", (byte)1).size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_list_varint 
CONTAINS 100 ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list_bigint)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set_smallint)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map_tinyint)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_list_varint)) USING 
'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_list_bigint = ?", Arrays.asList(1L, 2L, 3L));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_set_smallint 
CONTAINS ?", (short)10);
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_map_tinyint[?] = 
'v1'", (byte)1);
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_list_varint 
CONTAINS 100");
+            assertEquals(1, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenCollectionsWithDecimalTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "frozen_list_float frozen<list<float>>, " +
+                    "frozen_set_double frozen<set<double>>, " +
+                    "frozen_map_decimal frozen<map<decimal, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list_float, frozen_set_double, 
frozen_map_decimal) " +
+                "VALUES (1, [1.1, 2.2], {10.5, 20.5}, {1.0: 'v1', 2.0: 
'v2'})");
+        execute("INSERT INTO %s (pk, frozen_list_float, frozen_set_double, 
frozen_map_decimal) " +
+                "VALUES (2, [1.1, 2.2], {30.5}, {3.0: 'v3'})");
+        execute("INSERT INTO %s (pk, frozen_list_float, frozen_set_double, 
frozen_map_decimal) " +
+                "VALUES (3, [], {}, {})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_float = ? 
ALLOW FILTERING", Arrays.asList(1.1f, 2.2f)).size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set_double 
CONTAINS 10.5 ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE 
frozen_map_decimal[1.0] = 'v1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list_float)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set_double)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map_decimal)) USING 
'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_list_float = ?", Arrays.asList(1.1f, 2.2f));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_set_double 
CONTAINS 10.5");
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_map_decimal[1.0] 
= 'v1'");
+            assertEquals(1, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenCollectionsWithTextVariants() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "frozen_list_varchar frozen<list<varchar>>, " +
+                    "frozen_set_ascii frozen<set<ascii>>, " +
+                    "frozen_map_text frozen<map<text, varchar>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list_varchar, frozen_set_ascii, 
frozen_map_text) " +
+                "VALUES (1, ['apple', 'banana'], {'hello', 'world'}, {'key1': 
'value1', 'key2': 'value2'})");
+        execute("INSERT INTO %s (pk, frozen_list_varchar, frozen_set_ascii, 
frozen_map_text) " +
+                "VALUES (2, ['apple', 'banana'], {'test'}, {'key3': 
'value3'})");
+        execute("INSERT INTO %s (pk, frozen_list_varchar, frozen_set_ascii, 
frozen_map_text) " +
+                "VALUES (3, [], {}, {})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_varchar = 
? ALLOW FILTERING", Arrays.asList("apple", "banana")).size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set_ascii 
CONTAINS 'hello' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE 
frozen_map_text['key1'] = 'value1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list_varchar)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set_ascii)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map_text)) USING 
'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_list_varchar = ?", Arrays.asList("apple", "banana"));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_set_ascii 
CONTAINS 'hello'");
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_map_text['key1'] 
= 'value1'");
+            assertEquals(1, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenCollectionsWithTimeTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "frozen_list_timestamp frozen<list<timestamp>>, " +
+                    "frozen_set_date frozen<set<date>>, " +
+                    "frozen_map_time frozen<map<time, text>>, " +
+                    "frozen_list_timeuuid frozen<list<timeuuid>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list_timestamp, frozen_set_date, 
frozen_map_time, frozen_list_timeuuid) " +
+                "VALUES (1, ['2023-01-01 00:00:00+0000', '2023-01-02 
00:00:00+0000'], {'2023-01-01', '2023-01-02'}, " +
+                "{'12:00:00': 'noon', '18:00:00': 'evening'}, 
[50554d6e-29bb-11e5-b345-feff819cdc9f])");
+        execute("INSERT INTO %s (pk, frozen_list_timestamp, frozen_set_date, 
frozen_map_time, frozen_list_timeuuid) " +
+                "VALUES (2, ['2023-01-01 00:00:00+0000', '2023-01-02 
00:00:00+0000'], {'2023-01-03'}, " +
+                "{'06:00:00': 'morning'}, 
[50554d6e-29bb-11e5-b345-feff819cdc9f])");
+        execute("INSERT INTO %s (pk, frozen_list_timestamp, frozen_set_date, 
frozen_map_time, frozen_list_timeuuid) " +
+                "VALUES (3, [], {}, {}, [])");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_timestamp 
= ['2023-01-01 00:00:00+0000', '2023-01-02 00:00:00+0000'] ALLOW 
FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set_date 
CONTAINS '2023-01-01' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE 
frozen_map_time['12:00:00'] = 'noon' ALLOW FILTERING").size());
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_timeuuid 
CONTAINS 50554d6e-29bb-11e5-b345-feff819cdc9f ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list_timestamp)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set_date)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map_time)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_list_timeuuid)) USING 
'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_list_timestamp = ['2023-01-01 00:00:00+0000', '2023-01-02 
00:00:00+0000']");
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_set_date 
CONTAINS '2023-01-01'");
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE 
frozen_map_time['12:00:00'] = 'noon'");
+            assertEquals(1, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_list_timeuuid 
CONTAINS 50554d6e-29bb-11e5-b345-feff819cdc9f");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenCollectionsWithOtherTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, " +
+                    "frozen_list_uuid frozen<list<uuid>>, " +
+                    "frozen_set_boolean frozen<set<boolean>>, " +
+                    "frozen_map_inet frozen<map<inet, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list_uuid, frozen_set_boolean, 
frozen_map_inet) " +
+                "VALUES (1, [550e8400-e29b-41d4-a716-446655440000, 
550e8400-e29b-41d4-a716-446655440001], " +
+                "{true, false}, {'127.0.0.1': 'localhost', '192.168.1.1': 
'router'})");
+        execute("INSERT INTO %s (pk, frozen_list_uuid, frozen_set_boolean, 
frozen_map_inet) " +
+                "VALUES (2, [550e8400-e29b-41d4-a716-446655440000, 
550e8400-e29b-41d4-a716-446655440001], " +
+                "{true}, {'10.0.0.1': 'server'})");
+        execute("INSERT INTO %s (pk, frozen_list_uuid, frozen_map_inet) " +
+                "VALUES (3, [], {})");
+
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_list_uuid = ? 
ALLOW FILTERING",
+                                
Arrays.asList(UUID.fromString("550e8400-e29b-41d4-a716-446655440000"),
+                                              
UUID.fromString("550e8400-e29b-41d4-a716-446655440001"))).size());
+        assertEquals(2, execute("SELECT pk FROM %s WHERE frozen_set_boolean 
CONTAINS true ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE 
frozen_map_inet['127.0.0.1'] = 'localhost' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(FULL(frozen_list_uuid)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set_boolean)) USING 
'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map_inet)) USING 
'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE 
frozen_list_uuid = ?",
+                                        
Arrays.asList(UUID.fromString("550e8400-e29b-41d4-a716-446655440000"),
+                                                      
UUID.fromString("550e8400-e29b-41d4-a716-446655440001")));
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE frozen_set_boolean 
CONTAINS true");
+            assertEquals(2, rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE 
frozen_map_inet['127.0.0.1'] = 'localhost'");
+            assertEquals(1, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testNonFrozenCollectionsIndexes()
+    {
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, list_col 
list<text>)");
+        createIndex("CREATE INDEX ON %s(VALUES(list_col)) USING 'sai'");
+
+        execute("INSERT INTO %s (id, list_col) VALUES (1, ['apple', 
'banana'])");
+        execute("INSERT INTO %s (id, list_col) VALUES (2, ['cherry', 
'date'])");
+        execute("INSERT INTO %s (id, list_col) VALUES (3, ['apple', 'banana', 
'cherry'])");
+        flush();
+
+        ResultSet rows = executeNet("SELECT id FROM %s WHERE list_col CONTAINS 
'apple'");
+        assertEquals(2, rows.all().size());
+
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, set_col set<text>)");
+        createIndex("CREATE INDEX ON %s(VALUES(set_col)) USING 'sai'");
+
+        execute("INSERT INTO %s (id, set_col) VALUES (1, {'apple', 
'banana'})");
+        execute("INSERT INTO %s (id, set_col) VALUES (2, {'cherry', 'date'})");
+        execute("INSERT INTO %s (id, set_col) VALUES (3, {'apple', 'banana', 
'cherry'})");
+        flush();
+
+        rows = executeNet("SELECT id FROM %s WHERE set_col CONTAINS 'apple'");
+        assertEquals(2, rows.all().size());
+
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, map_col map<text, 
text>)");
+        createIndex("CREATE INDEX ON %s(VALUES(map_col)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(KEYS(map_col)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(ENTRIES(map_col)) USING 'sai'");
+
+        execute("INSERT INTO %s (id, map_col) VALUES (1, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (id, map_col) VALUES (2, {'k3': 'v3', 'k4': 
'v4'})");
+        execute("INSERT INTO %s (id, map_col) VALUES (3, {'k1': 'v1', 'k2': 
'v2', 'k3': 'v3'})");
+        flush();
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col CONTAINS 'v1'");
+        assertEquals(2, rows.all().size());
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col CONTAINS key 'k1'");
+        assertEquals(2, rows.all().size());
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col['k1'] = 'v1'");
+        assertEquals(2, rows.all().size());
+
+        execute("UPDATE %s SET map_col = map_col + {'k1': 'v1_updated'} WHERE 
id = 1");
+        flush();
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col['k1'] = 
'v1_updated'");
+        assertEquals(1, rows.all().size());
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col['k1'] = 'v1'");
+        assertEquals(1, rows.all().size());
+
+        execute("DELETE map_col['k1'] FROM %s WHERE id = 3");
+        flush();
+
+        rows = executeNet("SELECT id FROM %s WHERE map_col CONTAINS key 'k1'");
+        assertEquals(1, rows.all().size());
+
+        assertThatThrownBy(() -> executeNet("SELECT id FROM %s WHERE map_col = 
{'k1': 'v1'} ALLOW FILTERING"))
+        .isInstanceOf(Exception.class);
+    }
+
+    @Test
+    public void testSaiNonFrozenMap()
+    {
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, frozen_map map<text, 
text>)");
+        createIndex("CREATE INDEX ON %s(ENTRIES(frozen_map)) USING 'sai'");
+
+        execute("INSERT INTO %s (id, frozen_map) VALUES (1, {'k1': 'v1', 'k2': 
'v2'})");
+        execute("INSERT INTO %s (id, frozen_map) VALUES (2, {'k3': 'v3', 'k4': 
'v4'})");
+        execute("INSERT INTO %s (id, frozen_map) VALUES (3, {'k1': 'v1', 'k2': 
'v2', 'k3': 'v3'})");
+        execute("INSERT INTO %s (id, frozen_map) VALUES (4, {'k1': 'v1', 'k2': 
'v2', 'k3': 'v3'})");
+        flush();
+
+        ResultSet rows = executeNet("SELECT id FROM %s WHERE 
frozen_map['k1']='v1';");
+        assertEquals(3, rows.all().size());
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [1, 2, 3], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [4, 5, 6], 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 7, 8], 300)");
+
+        assertEquals("Should find 2 rows containing value 1", 2,
+                     execute("SELECT pk, v FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk, v FROM %s WHERE ck 
CONTAINS 1");
+            assertEquals("Should find 2 rows containing value 1", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenSetClusteringKeyWithValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<set<text>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'a', 'b'}, 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'c', 'd'}, 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, {'a', 'e'}, 300)");
+
+        assertEquals("Should find 2 rows containing 'a'", 2,
+                     execute("SELECT pk, v FROM %s WHERE ck CONTAINS 'a' ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk, v FROM %s WHERE ck 
CONTAINS 'a'");
+            assertEquals("Should find 2 rows containing 'a'", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapClusteringKeyWithKeysIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<map<text, int>>, v 
int, PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'x': 1, 'y': 2}, 
100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'z': 3}, 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, {'x': 4}, 300)");
+
+        assertEquals("Should find 2 rows with key 'x'", 2,
+                     execute("SELECT pk, v FROM %s WHERE ck CONTAINS KEY 'x' 
ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(KEYS(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk, v FROM %s WHERE ck 
CONTAINS KEY 'x'");
+            assertEquals("Should find 2 rows with key 'x'", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapClusteringKeyWithValuesIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<map<text, int>>, v 
int, PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'x': 1, 'y': 2}, 
100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'z': 3}, 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, {'x': 1}, 300)");
+
+        assertEquals("Should find 2 rows with value 1", 2,
+                     execute("SELECT pk, v FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk, v FROM %s WHERE ck 
CONTAINS 1");
+            assertEquals("Should find 2 rows with value 1", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapClusteringKeyWithEntriesIndexNotAllowed() throws 
Throwable
+    {
+        // ENTRIES index on frozen map clustering column should not be allowed 
because
+        // map entry predicates (ck[key] = value) are not supported on 
clustering columns,
+        // making the index unqueryable.
+        createTable("CREATE TABLE %s (pk int, ck frozen<map<int, text>>, v 
int, PRIMARY KEY (pk, ck))");
+        assertInvalidMessage("Cannot create ENTRIES index on frozen map 
clustering column",
+                             "CREATE INDEX ON %s(ENTRIES(ck)) USING 'sai'");
+    }
+
+    @Test
+    public void testFrozenListAsSecondClusteringKey() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 frozen<list<int>>, 
v int, PRIMARY KEY (pk, ck1, ck2))");
+
+        execute("INSERT INTO %s (pk, ck1, ck2, v) VALUES (1, 1, [1, 2, 3], 
100)");
+        execute("INSERT INTO %s (pk, ck1, ck2, v) VALUES (1, 2, [4, 5, 6], 
200)");
+        execute("INSERT INTO %s (pk, ck1, ck2, v) VALUES (2, 1, [1, 7, 8], 
300)");
+
+        assertEquals("Should find 2 rows containing value 1", 2,
+                     execute("SELECT pk, ck1, v FROM %s WHERE ck2 CONTAINS 1 
ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck2)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk, ck1, v FROM %s WHERE ck2 
CONTAINS 1");
+            assertEquals("Should find 2 rows containing value 1", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithEmptyCollections() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 2], 200)");
+
+        assertEquals("Should find 1 row containing value 1", 1,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
1");
+            assertEquals("Should find 1 row containing value 1", 1, 
rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s");
+            assertEquals("Should have 2 rows total", 2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithFullIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [1, 2, 3], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 2, 3], 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (3, [4, 5, 6], 300)");
+
+        // Equality on clustering column works without index
+        assertEquals("Should find 2 rows with list [1, 2, 3]", 2,
+                     execute("SELECT pk FROM %s WHERE ck = ? ALLOW FILTERING", 
Arrays.asList(1, 2, 3)).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck = ?", 
Arrays.asList(1, 2, 3));
+            assertEquals("Should find 2 rows with list [1, 2, 3]", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenSetClusteringKeyWithFullIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<set<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {1, 2, 3}, 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, {1, 2, 3}, 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (3, {4, 5}, 300)");
+
+        // Equality on clustering column works without index
+        assertEquals("Should find 2 rows with set {1, 2, 3}", 2,
+                     execute("SELECT pk FROM %s WHERE ck = ? ALLOW FILTERING", 
ImmutableSet.of(1, 2, 3)).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck = ?", 
ImmutableSet.of(1, 2, 3));
+            assertEquals("Should find 2 rows with set {1, 2, 3}", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapClusteringKeyWithFullIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<map<text, int>>, v 
int, PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, {'a': 1, 'b': 2}, 
100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, {'a': 1, 'b': 2}, 
200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (3, {'c': 3}, 300)");
+
+        // Equality on clustering column works without index
+        assertEquals("Should find 2 rows with map {'a': 1, 'b': 2}", 2,
+                     execute("SELECT pk FROM %s WHERE ck = ? ALLOW FILTERING", 
ImmutableMap.of("a", 1, "b", 2)).size());
+
+        createIndex("CREATE INDEX ON %s(FULL(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck = ?", 
ImmutableMap.of("a", 1, "b", 2));
+            assertEquals("Should find 2 rows with map {'a': 1, 'b': 2}", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyDelete() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [1, 2, 3], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 4, 5], 200)");
+
+        assertEquals("Should have 2 rows containing value 1", 2,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 1");
+        assertEquals("Should have 2 rows containing value 1", 2, 
rows.all().size());
+
+        execute("DELETE FROM %s WHERE pk = 1 AND ck = [1, 2, 3]");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows2 = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
1");
+            assertEquals("After delete, should have 1 row containing value 1", 
1, rows2.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyMultiplePartitions() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [1, 2], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 3], 200)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (3, [1, 4], 300)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (4, [5, 6], 400)");
+
+        assertEquals("Should have 3 rows containing value 1 across different 
partitions", 3,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
1");
+            assertEquals("Should have 3 rows containing value 1 across 
different partitions", 3, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithDuplicateValues() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, [1, 1, 1, 2], 100)");
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [1, 3], 200)");
+
+        assertEquals("Should find 2 rows containing value 1 (even with 
duplicates)", 2,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS 1 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
1");
+            assertEquals("Should find 2 rows containing value 1 (even with 
duplicates)", 2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithLargeCollection() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<int>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        List<Integer> largeList = new ArrayList<>();
+        for (int i = 0; i < 100; i++)
+        {
+            largeList.add(i);
+        }
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, ?, 100)", largeList);
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, [99, 200], 200)");
+
+        assertEquals("Should find 2 rows containing value 99", 2,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS 99 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
99");
+            assertEquals("Should find 2 rows containing value 99", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithNumericTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, " +
+                    "ck1 frozen<list<bigint>>, " +
+                    "ck2 frozen<list<smallint>>, " +
+                    "ck3 frozen<list<varint>>, " +
+                    "v int, " +
+                    "PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+        execute("INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1, [1, 2], [10, 
20], [100, 200], 100)");
+        execute("INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (2, [1, 3], [10, 
30], [100, 300], 200)");
+
+        assertEquals("Should find 2 rows with bigint value 1", 2,
+                     execute("SELECT pk FROM %s WHERE ck1 CONTAINS ? ALLOW 
FILTERING", 1L).size());
+        assertEquals("Should find 2 rows with smallint value 10", 2,
+                     execute("SELECT pk FROM %s WHERE ck2 CONTAINS ? ALLOW 
FILTERING", (short)10).size());
+        assertEquals("Should find 2 rows with varint value 100", 2,
+                     execute("SELECT pk FROM %s WHERE ck3 CONTAINS 100 ALLOW 
FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck1)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(ck2)) USING 'sai'");
+        createIndex("CREATE INDEX ON %s(VALUES(ck3)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck1 CONTAINS 
?", 1L);
+            assertEquals("Should find 2 rows with bigint value 1", 2, 
rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE ck2 CONTAINS ?", 
(short)10);
+            assertEquals("Should find 2 rows with smallint value 10", 2, 
rows.all().size());
+
+            rows = executeNet("SELECT pk FROM %s WHERE ck3 CONTAINS 100");
+            assertEquals("Should find 2 rows with varint value 100", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListClusteringKeyWithUUIDType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck frozen<list<uuid>>, v int, 
PRIMARY KEY (pk, ck))");
+
+        UUID uuid1 = UUID.fromString("550e8400-e29b-41d4-a716-446655440000");
+        UUID uuid2 = UUID.fromString("550e8400-e29b-41d4-a716-446655440001");
+
+        execute("INSERT INTO %s (pk, ck, v) VALUES (1, ?, 100)", 
Arrays.asList(uuid1, uuid2));
+        execute("INSERT INTO %s (pk, ck, v) VALUES (2, ?, 200)", 
Arrays.asList(uuid1));
+
+        assertEquals("Should find 2 rows containing UUID", 2,
+                     execute("SELECT pk FROM %s WHERE ck CONTAINS ? ALLOW 
FILTERING", uuid1).size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(ck)) USING 'sai'");
+
+        beforeAndAfterFlush(() -> {
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE ck CONTAINS 
?", uuid1);
+            assertEquals("Should find 2 rows containing UUID", 2, 
rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenListValuesIndexCaseInsensitive() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_list 
frozen<list<text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (1, ['Apple', 
'Banana'])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (2, ['banana', 
'cherry'])");
+        execute("INSERT INTO %s (pk, frozen_list) VALUES (3, ['cherry', 
'date'])");
+
+        // Without index, case-sensitive CONTAINS finds only exact matches
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_list CONTAINS 
'Banana' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_list CONTAINS 
'banana' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_list)) USING 'sai' WITH 
OPTIONS = { 'case_sensitive' : false }");
+
+        beforeAndAfterFlush(() -> {
+            // With case-insensitive index, 'banana' matches both 'Banana' and 
'banana'
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_list 
CONTAINS 'banana'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenSetValuesIndexCaseInsensitive() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_set 
frozen<set<text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (1, {'apple', 
'Banana'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (2, {'banana', 
'cherry'})");
+        execute("INSERT INTO %s (pk, frozen_set) VALUES (3, {'cherry', 
'date'})");
+
+        // Without index, case-sensitive CONTAINS finds only exact matches
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set CONTAINS 
'Banana' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_set CONTAINS 
'banana' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_set)) USING 'sai' WITH 
OPTIONS = { 'case_sensitive' : false }");
+
+        beforeAndAfterFlush(() -> {
+            // With case-insensitive index, 'banana' matches both 'Banana' and 
'banana'
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_set 
CONTAINS 'banana'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapValuesIndexCaseInsensitive() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, {'k1': 'Value1', 
'k2': 'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, {'k3': 'value1', 
'k4': 'v3'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, {'k5': 'v3', 'k6': 
'v4'})");
+
+        // Without index, case-sensitive CONTAINS finds only exact matches
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
'Value1' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
'value1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(VALUES(frozen_map)) USING 'sai' WITH 
OPTIONS = { 'case_sensitive' : false }");
+
+        beforeAndAfterFlush(() -> {
+            // With case-insensitive index, 'value1' matches both 'Value1' and 
'value1'
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map 
CONTAINS 'value1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
+    @Test
+    public void testFrozenMapKeysIndexCaseInsensitive() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, frozen_map 
frozen<map<text, text>>)");
+
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (1, {'Key1': 'v1', 
'k2': 'v2'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (2, {'key1': 'v3', 
'k4': 'v4'})");
+        execute("INSERT INTO %s (pk, frozen_map) VALUES (3, {'k5': 'v5', 'k6': 
'v6'})");
+
+        // Without index, case-sensitive CONTAINS KEY finds only exact matches
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
KEY 'Key1' ALLOW FILTERING").size());
+        assertEquals(1, execute("SELECT pk FROM %s WHERE frozen_map CONTAINS 
KEY 'key1' ALLOW FILTERING").size());
+
+        createIndex("CREATE INDEX ON %s(KEYS(frozen_map)) USING 'sai' WITH 
OPTIONS = { 'case_sensitive' : false }");
+
+        beforeAndAfterFlush(() -> {
+            // With case-insensitive index, 'key1' matches both 'Key1' and 
'key1'
+            ResultSet rows = executeNet("SELECT pk FROM %s WHERE frozen_map 
CONTAINS KEY 'key1'");
+            assertEquals(2, rows.all().size());
+        });
+    }
+
     private void createPopulatedMap(String createIndex)
     {
         createTable("CREATE TABLE %s (pk int primary key, value map<int, 
text>)");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to