Repository: cassandra
Updated Branches:
  refs/heads/trunk b680ddd61 -> 1a73af768


Make custom filter expressions more extensible

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-11295


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1a73af76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1a73af76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1a73af76

Branch: refs/heads/trunk
Commit: 1a73af7686e41cf8c2c0a031c9ef466b13a4b794
Parents: b680ddd
Author: Sam Tunnicliffe <[email protected]>
Authored: Thu Feb 25 15:32:34 2016 +0000
Committer: Sam Tunnicliffe <[email protected]>
Committed: Wed Apr 6 16:29:01 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 .../cql3/restrictions/Restrictions.java         |   2 +-
 .../restrictions/StatementRestrictions.java     |   6 +
 .../cql3/statements/ModificationStatement.java  |   8 ++
 .../apache/cassandra/db/filter/RowFilter.java   | 144 ++++++++++++++++++-
 .../cassandra/index/SecondaryIndexManager.java  |   2 +-
 7 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95ff24c..e522035 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Make custom filtering more extensible with UserExpression (CASSANDRA-11295)
  * Improve field-checking and error reporting in cassandra.yaml 
(CASSANDRA-10649)
  * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507)
  * More user friendly error when providing an invalid token to nodetool 
(CASSANDRA-9348)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index cc3e9c2..e073592 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,8 @@ New features
    - Startup is now aborted if corrupted transaction log files are found. The 
details
      of the affected log files are now logged, allowing the operator to decide 
how
      to resolve the situation.
+   - Filtering expressions are made more pluggable and can be added 
programatically via
+     a QueryHandler implementation. See CASSANDRA-11295 for more details.
 
 3.4
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 705d66d..f46f176 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.index.SecondaryIndexManager;
 /**
  * Sets of restrictions
  */
-interface Restrictions
+public interface Restrictions
 {
     /**
      * Returns the column definitions in position order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index a35d86b..b00214c 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -270,6 +270,12 @@ public final class StatementRestrictions
                                 nonPrimaryKeyRestrictions.getFunctions());
     }
 
+    // may be used by QueryHandler implementations
+    public IndexRestrictions getIndexRestrictions()
+    {
+        return indexRestrictions;
+    }
+
     private void addSingleColumnRestriction(SingleColumnRestriction 
restriction)
     {
         ColumnDefinition def = restriction.columnDef;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 614e47a..06ff5d4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -147,6 +147,14 @@ public abstract class ModificationStatement implements 
CQLStatement
                                 conditions.getFunctions());
     }
 
+    /*
+     * May be used by QueryHandler implementations
+     */
+    public StatementRestrictions getRestrictions()
+    {
+        return restrictions;
+    }
+
     public abstract void addUpdateForKey(PartitionUpdate update, Clustering 
clustering, UpdateParameters params);
 
     public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, 
UpdateParameters params);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index fea8ea8..0ef29c2 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -20,15 +20,21 @@ package org.apache.cassandra.db.filter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -53,6 +59,8 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNu
  */
 public abstract class RowFilter implements Iterable<RowFilter.Expression>
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(RowFilter.class);
+
     public static final Serializer serializer = new Serializer();
     public static final RowFilter NONE = new 
CQLFilter(Collections.emptyList());
 
@@ -107,6 +115,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         expressions.add(expression);
     }
 
+    public void addUserExpression(UserExpression e)
+    {
+        expressions.add(e);
+    }
+
     public List<Expression> getExpressions()
     {
         return expressions;
@@ -245,13 +258,13 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                 DecoratedKey pk;
                 public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
                 {
+                    pk = partition.partitionKey();
+
                     // The filter might be on static columns, so need to check 
static row first.
                     if (filterStaticColumns && 
applyToRow(partition.staticRow()) == null)
                         return null;
 
-                    pk = partition.partitionKey();
                     UnfilteredRowIterator iterator = 
Transformation.apply(partition, this);
-
                     return (filterNonStaticColumns && !iterator.hasNext()) ? 
null : iterator;
                 }
 
@@ -327,7 +340,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         private static final Serializer serializer = new Serializer();
 
         // Note: the order of this enum matter, it's used for serialization
-        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM }
+        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM, 
USER }
 
         protected abstract Kind kind();
         protected final ColumnDefinition column;
@@ -346,6 +359,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             return kind() == Kind.CUSTOM;
         }
 
+        public boolean isUserDefined()
+        {
+            return kind() == Kind.USER;
+        }
+
         public ColumnDefinition column()
         {
             return column;
@@ -468,6 +486,13 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                     return;
                 }
 
+                if (expression.kind() == Kind.USER)
+                {
+                    assert version >= MessagingService.VERSION_30;
+                    UserExpression.serialize((UserExpression)expression, out, 
version);
+                    return;
+                }
+
                 
ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
                 expression.operator.writeTo(out);
 
@@ -511,6 +536,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                                                     
IndexMetadata.serializer.deserialize(in, version, metadata),
                                                     
ByteBufferUtil.readWithShortLength(in));
                     }
+
+                    if (kind == Kind.USER)
+                    {
+                        return UserExpression.deserialize(in, version, 
metadata);
+                    }
                 }
 
                 name = ByteBufferUtil.readWithShortLength(in);
@@ -560,8 +590,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                 // version 3.0+ includes a byte for Kind
                 long size = version >= MessagingService.VERSION_30 ? 1 : 0;
 
-                // custom expressions don't include a column or operator, all 
other expressions do
-                if (expression.kind() != Kind.CUSTOM)
+                // Custom expressions include neither a column or operator, 
but all
+                // other expressions do. Also, custom expressions are 3.0+ 
only, so
+                // the column & operator will always be the first things 
written for
+                // any pre-3.0 version
+                if (expression.kind() != Kind.CUSTOM && expression.kind() != 
Kind.USER)
                     size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
                             + expression.operator.serializedSize();
 
@@ -584,8 +617,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                     case CUSTOM:
                         if (version >= MessagingService.VERSION_30)
                             size += 
IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex,
 version)
-                                  + 
ByteBufferUtil.serializedSizeWithShortLength(expression.value);
+                                   + 
ByteBufferUtil.serializedSizeWithShortLength(expression.value);
                         break;
+                    case USER:
+                        if (version >= MessagingService.VERSION_30)
+                            size += 
UserExpression.serializedSize((UserExpression)expression, version);
                 }
                 return size;
             }
@@ -920,6 +956,100 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         }
     }
 
+    /**
+     * A user defined filtering expression. These may be added to RowFilter 
programmatically by a
+     * QueryHandler implementation. No concrete implementations are provided 
and adding custom impls
+     * to the classpath is a task for operators (needless to say, this is 
something of a power
+     * user feature). Care must also be taken to register implementations, via 
the static register
+     * method during system startup. An implementation and its corresponding 
Deserializer must be
+     * registered before sending or receiving any messages containing 
expressions of that type.
+     * Use of custom filtering expressions in a mixed version cluster should 
be handled with caution
+     * as the order in which types are registered is significant: if 
continuity of use during upgrades
+     * is important, new types should registered last & obsoleted types should 
still be registered (
+     * or dummy implementations registered in their place) to preserve 
consistent identifiers across
+     * the cluster).
+     *
+     * During serialization, the identifier for the Deserializer 
implementation is prepended to the
+     * implementation specific payload. To deserialize, the identifier is read 
first to obtain the
+     * Deserializer, which then provides the concrete expression instance.
+     */
+    public static abstract class UserExpression extends Expression
+    {
+        private static final DeserializerRegistry deserializers = new 
DeserializerRegistry();
+        private static final class DeserializerRegistry
+        {
+            private final AtomicInteger counter = new AtomicInteger(0);
+            private final ConcurrentMap<Integer, Deserializer> deserializers = 
new ConcurrentHashMap<>();
+            private final ConcurrentMap<Class<? extends UserExpression>, 
Integer> registeredClasses = new ConcurrentHashMap<>();
+
+            public void registerUserExpressionClass(Class<? extends 
UserExpression> expressionClass,
+                                                    
UserExpression.Deserializer deserializer)
+            {
+                int id = registeredClasses.computeIfAbsent(expressionClass, 
(cls) -> counter.getAndIncrement());
+                deserializers.put(id, deserializer);
+
+                logger.debug("Registered user defined expression type {} and 
serializer {} with identifier {}",
+                             expressionClass.getName(), 
deserializer.getClass().getName(), id);
+            }
+
+            public Integer getId(UserExpression expression)
+            {
+                return registeredClasses.get(expression.getClass());
+            }
+
+            public Deserializer getDeserializer(int id)
+            {
+                return deserializers.get(id);
+            }
+        }
+
+        protected static abstract class Deserializer
+        {
+            protected abstract UserExpression deserialize(DataInputPlus in,
+                                                          int version,
+                                                          CFMetaData metadata) 
throws IOException;
+        }
+
+        public static void register(Class<? extends UserExpression> 
expressionClass, Deserializer deserializer)
+        {
+            deserializers.registerUserExpressionClass(expressionClass, 
deserializer);
+        }
+
+        private static UserExpression deserialize(DataInputPlus in, int 
version, CFMetaData metadata) throws IOException
+        {
+            int id = in.readInt();
+            Deserializer deserializer = deserializers.getDeserializer(id);
+            assert deserializer != null : "No user defined expression type 
registered with id " + id;
+            return deserializer.deserialize(in, version, metadata);
+        }
+
+        private static void serialize(UserExpression expression, 
DataOutputPlus out, int version) throws IOException
+        {
+            Integer id = deserializers.getId(expression);
+            assert id != null : "User defined expression type " + 
expression.getClass().getName() + " is not registered";
+            out.writeInt(id);
+            expression.serialize(out, version);
+        }
+
+        private static long serializedSize(UserExpression expression, int 
version)
+        {   // 4 bytes for the expression type id
+            return 4 + expression.serializedSize(version);
+        }
+
+        protected UserExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        {
+            super(column, operator, value);
+        }
+
+        protected Kind kind()
+        {
+            return Kind.USER;
+        }
+
+        protected abstract void serialize(DataOutputPlus out, int version) 
throws IOException;
+        protected abstract long serializedSize(int version);
+    }
+
     public static class Serializer
     {
         public void serialize(RowFilter filter, DataOutputPlus out, int 
version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 09eabc7..cdb478c 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -632,7 +632,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 Tracing.trace("Command contains a custom index expression, 
using target index {}", customExpression.getTargetIndex().name);
                 return indexes.get(customExpression.getTargetIndex().name);
             }
-            else
+            else if (!expression.isUserDefined())
             {
                 indexes.values().stream()
                        .filter(index -> 
index.supportsExpression(expression.column(), expression.operator()))

Reply via email to