Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 c20566fa6 -> 0858bfbd7
Support counter-columns for native aggregates (sum,avg,max,min) patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-9977 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8287ebcb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8287ebcb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8287ebcb Branch: refs/heads/cassandra-3.0 Commit: 8287ebcb6ad46529ca90600dc0c2f98ecab89cf0 Parents: ee36f14 Author: Robert Stupp <[email protected]> Authored: Thu Dec 24 14:05:16 2015 +0100 Committer: Robert Stupp <[email protected]> Committed: Thu Dec 24 14:05:16 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cql3/functions/AggregateFcts.java | 141 +++++++++++-------- .../cassandra/cql3/functions/Functions.java | 2 + .../cql3/validation/entities/UFTest.java | 26 ++++ .../validation/operations/AggregationTest.java | 42 ++++++ 5 files changed, 156 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fb0b151..c0fd4f6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * Support counter-columns for native aggregates (sum,avg,max,min) (CASSANDRA-9977) * (cqlsh) show correct column names for empty result sets (CASSANDRA-9813) * Add new types to Stress (CASSANDRA-9556) * Add property to allow listening on broadcast interface (CASSANDRA-9748) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java index 41e43c0..77be525 100644 --- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java +++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.CounterColumnType; import org.apache.cassandra.db.marshal.DecimalType; import org.apache.cassandra.db.marshal.DoubleType; import org.apache.cassandra.db.marshal.FloatType; @@ -480,31 +481,7 @@ public abstract class AggregateFcts { public Aggregate newAggregate() { - return new Aggregate() - { - private long sum; - - public void reset() - { - sum = 0; - } - - public ByteBuffer compute(int protocolVersion) - { - return ((LongType) returnType()).decompose(sum); - } - - public void addInput(int protocolVersion, List<ByteBuffer> values) - { - ByteBuffer value = values.get(0); - - if (value == null) - return; - - Number number = ((Number) argTypes().get(0).compose(value)); - sum += number.longValue(); - } - }; + return new LongSumAggregate(); } }; @@ -516,37 +493,7 @@ public abstract class AggregateFcts { public Aggregate newAggregate() { - return new Aggregate() - { - private long sum; - - private int count; - - public void reset() - { - count = 0; - sum = 0; - } - - public ByteBuffer compute(int protocolVersion) - { - long avg = count == 0 ? 0 : sum / count; - - return ((LongType) returnType()).decompose(avg); - } - - public void addInput(int protocolVersion, List<ByteBuffer> values) - { - ByteBuffer value = values.get(0); - - if (value == null) - return; - - count++; - Number number = ((Number) argTypes().get(0).compose(value)); - sum += number.longValue(); - } - }; + return new LongAvgAggregate(); } }; @@ -707,6 +654,30 @@ public abstract class AggregateFcts }; /** + * The SUM function for counter column values. + */ + public static final AggregateFunction sumFunctionForCounter = + new NativeAggregateFunction("sum", CounterColumnType.instance, CounterColumnType.instance) + { + public Aggregate newAggregate() + { + return new LongSumAggregate(); + } + }; + + /** + * AVG function for counter column values. + */ + public static final AggregateFunction avgFunctionForCounter = + new NativeAggregateFunction("avg", CounterColumnType.instance, CounterColumnType.instance) + { + public Aggregate newAggregate() + { + return new LongAvgAggregate(); + } + }; + + /** * Creates a MAX function for the specified type. * * @param inputType the function input and output type @@ -827,4 +798,62 @@ public abstract class AggregateFcts } }; } + + private static class LongSumAggregate implements AggregateFunction.Aggregate + { + private long sum; + + public void reset() + { + sum = 0; + } + + public ByteBuffer compute(int protocolVersion) + { + return LongType.instance.decompose(sum); + } + + public void addInput(int protocolVersion, List<ByteBuffer> values) + { + ByteBuffer value = values.get(0); + + if (value == null) + return; + + Number number = LongType.instance.compose(value); + sum += number.longValue(); + } + } + + private static class LongAvgAggregate implements AggregateFunction.Aggregate + { + private long sum; + + private int count; + + public void reset() + { + count = 0; + sum = 0; + } + + public ByteBuffer compute(int protocolVersion) + { + long avg = count == 0 ? 0 : sum / count; + + return LongType.instance.decompose(avg); + } + + public void addInput(int protocolVersion, List<ByteBuffer> values) + { + ByteBuffer value = values.get(0); + + if (value == null) + return; + + count++; + Number number = LongType.instance.compose(value); + sum += number.longValue(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java index e31fc9f..0f1af19 100644 --- a/src/java/org/apache/cassandra/cql3/functions/Functions.java +++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java @@ -91,6 +91,7 @@ public abstract class Functions declare(AggregateFcts.sumFunctionForDouble); declare(AggregateFcts.sumFunctionForDecimal); declare(AggregateFcts.sumFunctionForVarint); + declare(AggregateFcts.sumFunctionForCounter); declare(AggregateFcts.avgFunctionForByte); declare(AggregateFcts.avgFunctionForShort); declare(AggregateFcts.avgFunctionForInt32); @@ -99,6 +100,7 @@ public abstract class Functions declare(AggregateFcts.avgFunctionForDouble); declare(AggregateFcts.avgFunctionForVarint); declare(AggregateFcts.avgFunctionForDecimal); + declare(AggregateFcts.avgFunctionForCounter); MigrationManager.instance.register(new FunctionsMigrationListener()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index 0d11a82..bcfe871 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@ -716,6 +716,32 @@ public class UFTest extends CQLTester } @Test + public void testJavaFunctionCounter() throws Throwable + { + createTable("CREATE TABLE %s (key int primary key, val counter)"); + + String fName = createFunction(KEYSPACE, "counter", + "CREATE OR REPLACE FUNCTION %s(val counter) " + + "CALLED ON NULL INPUT " + + "RETURNS bigint " + + "LANGUAGE JAVA " + + "AS 'return val + 1;';"); + + execute("UPDATE %s SET val = val + 1 WHERE key = 1"); + assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"), + row(1, 1L, 2L)); + execute("UPDATE %s SET val = val + 1 WHERE key = 1"); + assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"), + row(1, 2L, 3L)); + execute("UPDATE %s SET val = val + 2 WHERE key = 1"); + assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"), + row(1, 4L, 5L)); + execute("UPDATE %s SET val = val - 2 WHERE key = 1"); + assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"), + row(1, 2L, 3L)); + } + + @Test public void testFunctionInTargetKeyspace() throws Throwable { createTable("CREATE TABLE %s (key int primary key, val double)"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java index e661b4f..0e0313c 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.FunctionExecutionException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.messages.ResultMessage; @@ -172,6 +173,47 @@ public class AggregationTest extends CQLTester } @Test + public void testAggregateOnCounters() throws Throwable + { + createTable("CREATE TABLE %s (a int, b counter, primary key (a))"); + + // Test with empty table + assertColumnNames(execute("SELECT count(b), max(b) as max, b FROM %s"), + "system.count(b)", "max", "b"); + assertRows(execute("SELECT count(b), max(b) as max, b FROM %s"), + row(0L, null, null)); + + execute("UPDATE %s SET b = b + 1 WHERE a = 1"); + execute("UPDATE %s SET b = b + 1 WHERE a = 1"); + + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(1L, 2L, 2L, 2L, 2L)); + flush(); + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(1L, 2L, 2L, 2L, 2L)); + + execute("UPDATE %s SET b = b + 2 WHERE a = 1"); + + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(1L, 4L, 4L, 4L, 4L)); + + execute("UPDATE %s SET b = b - 2 WHERE a = 1"); + + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(1L, 2L, 2L, 2L, 2L)); + flush(); + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(1L, 2L, 2L, 2L, 2L)); + + execute("UPDATE %s SET b = b + 1 WHERE a = 2"); + execute("UPDATE %s SET b = b + 1 WHERE a = 2"); + execute("UPDATE %s SET b = b + 2 WHERE a = 2"); + + assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"), + row(2L, 4L, 2L, 3L, 6L)); + } + + @Test public void testAggregateWithUdtFields() throws Throwable { String myType = createType("CREATE TYPE %s (x int)");
