Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/statements/Selection.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45084f18
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45084f18
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45084f18
Branch: refs/heads/trunk
Commit: 45084f182a46234243b94059fd1b6b53e927ead8
Parents: 708c6ba 1d285ea
Author: Sylvain Lebresne <[email protected]>
Authored: Wed Oct 29 10:49:20 2014 +0100
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Oct 29 10:49:20 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol_v3.spec | 8 +++--
.../org/apache/cassandra/cql3/QueryOptions.java | 10 +++---
.../apache/cassandra/cql3/UpdateParameters.java | 6 ++++
.../cassandra/cql3/selection/Selection.java | 2 +-
.../cql3/selection/WritetimeOrTTLSelector.java | 4 +--
.../apache/cassandra/cql3/TimestampTest.java | 36 ++++++++++++++++++++
7 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selection.java
index 67cce72,0000000..cd5e2a8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@@ -1,390 -1,0 +1,390 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.selection;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import com.google.common.collect.Iterators;
+
+public abstract class Selection
+{
+ private final Collection<ColumnDefinition> columns;
+ private final ResultSet.Metadata metadata;
+ private final boolean collectTimestamps;
+ private final boolean collectTTLs;
+
+ protected Selection(Collection<ColumnDefinition> columns,
List<ColumnSpecification> metadata, boolean collectTimestamps, boolean
collectTTLs)
+ {
+ this.columns = columns;
+ this.metadata = new ResultSet.Metadata(metadata);
+ this.collectTimestamps = collectTimestamps;
+ this.collectTTLs = collectTTLs;
+ }
+
+ // Overriden by SimpleSelection when appropriate.
+ public boolean isWildcard()
+ {
+ return false;
+ }
+
+ public ResultSet.Metadata getResultMetadata()
+ {
+ return metadata;
+ }
+
+ public static Selection wildcard(CFMetaData cfm)
+ {
+ List<ColumnDefinition> all = new
ArrayList<ColumnDefinition>(cfm.allColumns().size());
+ Iterators.addAll(all, cfm.allColumnsInSelectOrder());
+ return new SimpleSelection(all, true);
+ }
+
+ public static Selection forColumns(Collection<ColumnDefinition> columns)
+ {
+ return new SimpleSelection(columns, false);
+ }
+
+ public int addColumnForOrdering(ColumnDefinition c)
+ {
+ columns.add(c);
+ metadata.addNonSerializedColumn(c);
+ return columns.size() - 1;
+ }
+
+ private static boolean isUsingFunction(List<RawSelector> rawSelectors)
+ {
+ for (RawSelector rawSelector : rawSelectors)
+ {
+ if (!(rawSelector.selectable instanceof ColumnIdentifier))
+ return true;
+ }
+ return false;
+ }
+
+ public static Selection fromSelectors(CFMetaData cfm, List<RawSelector>
rawSelectors) throws InvalidRequestException
+ {
+ List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>();
+
+ SelectorFactories factories =
+
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors),
cfm, defs);
+ List<ColumnSpecification> metadata = collectMetadata(cfm,
rawSelectors, factories);
+
+ return isUsingFunction(rawSelectors) ? new
SelectionWithFunctions(defs, metadata, factories)
+ : new SimpleSelection(defs,
metadata, false);
+ }
+
+ private static List<ColumnSpecification> collectMetadata(CFMetaData cfm,
+
List<RawSelector> rawSelectors,
+
SelectorFactories factories)
+ {
+ List<ColumnSpecification> metadata = new
ArrayList<ColumnSpecification>(rawSelectors.size());
+ Iterator<RawSelector> iter = rawSelectors.iterator();
+ for (Selector.Factory factory : factories)
+ {
+ ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
+ ColumnIdentifier alias = iter.next().alias;
+ metadata.add(alias == null ? colSpec : colSpec.withAlias(alias));
+ }
+ return metadata;
+ }
+
+ protected abstract Selectors newSelectors();
+
+ /**
+ * @return the list of CQL3 columns value this SelectionClause needs.
+ */
+ public Collection<ColumnDefinition> getColumns()
+ {
+ return columns;
+ }
+
+ public ResultSetBuilder resultSetBuilder(long now)
+ {
+ return new ResultSetBuilder(now);
+ }
+
+ public abstract boolean isAggregate();
+
+ /**
+ * Checks that selectors are either all aggregates or that none of them
is.
+ *
+ * @param selectors the selectors to test.
+ * @param msgTemplate the error message template
+ * @param messageArgs the error message arguments
+ * @throws InvalidRequestException if some of the selectors are aggregate
but not all of them
+ */
+ static void validateSelectors(List<Selector> selectors, String
messageTemplate, Object... messageArgs)
+ throws InvalidRequestException
+ {
+ int aggregates = 0;
+ for (Selector s : selectors)
+ if (s.isAggregate())
+ ++aggregates;
+
+ if (aggregates != 0 && aggregates != selectors.size())
+ throw new InvalidRequestException(String.format(messageTemplate,
messageArgs));
+ }
+
+ public class ResultSetBuilder
+ {
+ private final ResultSet resultSet;
+
+ /**
+ * As multiple thread can access a <code>Selection</code> instance
each <code>ResultSetBuilder</code> will use
+ * its own <code>Selectors</code> instance.
+ */
+ private final Selectors selectors;
+
+ /*
+ * We'll build CQL3 row one by one.
+ * The currentRow is the values for the (CQL3) columns we've fetched.
+ * We also collect timestamps and ttls for the case where the
writetime and
+ * ttl functions are used. Note that we might collect timestamp
and/or ttls
+ * we don't care about, but since the array below are allocated just
once,
+ * it doesn't matter performance wise.
+ */
+ List<ByteBuffer> current;
+ final long[] timestamps;
+ final int[] ttls;
+ final long now;
+
+ private ResultSetBuilder(long now)
+ {
+ this.resultSet = new ResultSet(getResultMetadata().copy(), new
ArrayList<List<ByteBuffer>>());
+ this.selectors = newSelectors();
+ this.timestamps = collectTimestamps ? new long[columns.size()] :
null;
+ this.ttls = collectTTLs ? new int[columns.size()] : null;
+ this.now = now;
+ }
+
+ public void add(ByteBuffer v)
+ {
+ current.add(v);
+ }
+
+ public void add(Cell c)
+ {
+ current.add(isDead(c) ? null : value(c));
+ if (timestamps != null)
+ {
- timestamps[current.size() - 1] = isDead(c) ? -1 :
c.timestamp();
++ timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE :
c.timestamp();
+ }
+ if (ttls != null)
+ {
+ int ttl = -1;
+ if (!isDead(c) && c instanceof ExpiringCell)
+ ttl = c.getLocalDeletionTime() - (int) (now / 1000);
+ ttls[current.size() - 1] = ttl;
+ }
+ }
+
+ private boolean isDead(Cell c)
+ {
+ return c == null || !c.isLive(now);
+ }
+
+ public void newRow() throws InvalidRequestException
+ {
+ if (current != null)
+ {
+ selectors.addInputRow(this);
+ if (!selectors.isAggregate())
+ {
+ resultSet.addRow(selectors.getOutputRow());
+ selectors.reset();
+ }
+ }
+ current = new ArrayList<ByteBuffer>(columns.size());
+ }
+
+ public ResultSet build() throws InvalidRequestException
+ {
+ if (current != null)
+ {
+ selectors.addInputRow(this);
+ resultSet.addRow(selectors.getOutputRow());
+ selectors.reset();
+ current = null;
+ }
+ return resultSet;
+ }
+
+ private ByteBuffer value(Cell c)
+ {
+ return (c instanceof CounterCell)
+ ?
ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+ : c.value();
+ }
+ }
+
+ private static interface Selectors
+ {
+ public boolean isAggregate();
+
+ /**
+ * Adds the current row of the specified
<code>ResultSetBuilder</code>.
+ *
+ * @param rs the <code>ResultSetBuilder</code>
+ * @throws InvalidRequestException
+ */
+ public void addInputRow(ResultSetBuilder rs) throws
InvalidRequestException;
+
+ public List<ByteBuffer> getOutputRow() throws InvalidRequestException;
+
+ public void reset();
+ }
+
+ // Special cased selection for when no function is used (this save some
allocations).
+ private static class SimpleSelection extends Selection
+ {
+ private final boolean isWildcard;
+
+ public SimpleSelection(Collection<ColumnDefinition> columns, boolean
isWildcard)
+ {
+ this(columns, new ArrayList<ColumnSpecification>(columns),
isWildcard);
+ }
+
+ public SimpleSelection(Collection<ColumnDefinition> columns,
List<ColumnSpecification> metadata, boolean isWildcard)
+ {
+ /*
+ * In theory, even a simple selection could have multiple time
the same column, so we
+ * could filter those duplicate out of columns. But since we're
very unlikely to
+ * get much duplicate in practice, it's more efficient not to
bother.
+ */
+ super(columns, metadata, false, false);
+ this.isWildcard = isWildcard;
+ }
+
+ @Override
+ public boolean isWildcard()
+ {
+ return isWildcard;
+ }
+
+ public boolean isAggregate()
+ {
+ return false;
+ }
+
+ protected Selectors newSelectors()
+ {
+ return new Selectors()
+ {
+ private List<ByteBuffer> current;
+
+ public void reset()
+ {
+ current = null;
+ }
+
+ public List<ByteBuffer> getOutputRow()
+ {
+ return current;
+ }
+
+ public void addInputRow(ResultSetBuilder rs) throws
InvalidRequestException
+ {
+ current = rs.current;
+ }
+
+ public boolean isAggregate()
+ {
+ return false;
+ }
+ };
+ }
+ }
+
+ private static class SelectionWithFunctions extends Selection
+ {
+ private final SelectorFactories factories;
+
+ public SelectionWithFunctions(Collection<ColumnDefinition> columns,
+ List<ColumnSpecification> metadata,
+ SelectorFactories factories) throws
InvalidRequestException
+ {
+ super(columns, metadata,
factories.containsWritetimeSelectorFactory(),
factories.containsTTLSelectorFactory());
+ this.factories = factories;
+
+ if (factories.doesAggregation() &&
!factories.containsOnlyAggregateFunctions())
+ throw new InvalidRequestException("the select clause must
either contains only aggregates or none");
+ }
+
+ public boolean isAggregate()
+ {
+ return factories.containsOnlyAggregateFunctions();
+ }
+
+ protected Selectors newSelectors()
+ {
+ return new Selectors()
+ {
+ private final List<Selector> selectors =
factories.newInstances();
+
+ public void reset()
+ {
+ for (int i = 0, m = selectors.size(); i < m; i++)
+ {
+ selectors.get(i).reset();
+ }
+ }
+
+ public boolean isAggregate()
+ {
+ return factories.containsOnlyAggregateFunctions();
+ }
+
+ public List<ByteBuffer> getOutputRow() throws
InvalidRequestException
+ {
+ List<ByteBuffer> outputRow = new
ArrayList<>(selectors.size());
+
+ for (int i = 0, m = selectors.size(); i < m; i++)
+ {
+ outputRow.add(selectors.get(i).getOutput());
+ }
+ return outputRow;
+ }
+
+ public void addInputRow(ResultSetBuilder rs) throws
InvalidRequestException
+ {
+ for (int i = 0, m = selectors.size(); i < m; i++)
+ {
+ selectors.get(i).addInput(rs);
+ }
+ }
+ };
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 6d6edd3,0000000..a57a3ca
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@@ -1,110 -1,0 +1,110 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.selection;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+final class WritetimeOrTTLSelector extends Selector
+{
+ private final String columnName;
+ private final int idx;
+ private final boolean isWritetime;
+ private ByteBuffer current;
+
+ public static Factory newFactory(final String columnName, final int idx,
final boolean isWritetime)
+ {
+ return new Factory()
+ {
+ public ColumnSpecification getColumnSpecification(CFMetaData cfm)
+ {
+ String text = String.format("%s(%s)", isWritetime ?
"writetime" : "ttl", columnName);
+ return new ColumnSpecification(cfm.ksName,
+ cfm.cfName,
+ new ColumnIdentifier(text,
true),
+ isWritetime ?
LongType.instance : Int32Type.instance);
+ }
+
+ public Selector newInstance()
+ {
+ return new WritetimeOrTTLSelector(columnName, idx,
isWritetime);
+ }
+
+ public boolean isWritetimeSelectorFactory()
+ {
+ return isWritetime;
+ }
+
+ public boolean isTTLSelectorFactory()
+ {
+ return !isWritetime;
+ }
+ };
+ }
+
+ public void addInput(ResultSetBuilder rs)
+ {
+ if (isWritetime)
+ {
+ long ts = rs.timestamps[idx];
- current = ts >= 0 ? ByteBufferUtil.bytes(ts) : null;
++ current = ts != Long.MIN_VALUE ? ByteBufferUtil.bytes(ts) : null;
+ }
+ else
+ {
+ int ttl = rs.ttls[idx];
+ current = ttl > 0 ? ByteBufferUtil.bytes(ttl) : null;
+ }
+ }
+
+ public ByteBuffer getOutput()
+ {
+ return current;
+ }
+
+ public void reset()
+ {
+ current = null;
+ }
+
+ public AbstractType<?> getType()
+ {
+ return isWritetime ? LongType.instance : Int32Type.instance;
+ }
+
+ @Override
+ public String toString()
+ {
+ return columnName;
+ }
+
+ private WritetimeOrTTLSelector(String columnName, int idx, boolean
isWritetime)
+ {
+ this.columnName = columnName;
+ this.idx = idx;
+ this.isWritetime = isWritetime;
+ }
+
- }
++}