Updated Branches: refs/heads/trunk fac3042db -> c8a0a3a68
Pre-6504 cleanup and fixups patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6504 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8a0a3a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8a0a3a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8a0a3a6 Branch: refs/heads/trunk Commit: c8a0a3a689d965af8de683f2d831f2c422105670 Parents: fac3042 Author: Aleksey Yeschenko <[email protected]> Authored: Fri Jan 17 03:41:20 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Jan 17 03:41:20 2014 +0300 ---------------------------------------------------------------------- conf/cassandra.yaml | 2 +- .../org/apache/cassandra/config/CFMetaData.java | 5 ++ .../org/apache/cassandra/config/Config.java | 18 +++---- .../cql3/statements/CreateIndexStatement.java | 2 +- .../cql3/statements/ModificationStatement.java | 2 +- src/java/org/apache/cassandra/db/Cell.java | 14 ++---- .../org/apache/cassandra/db/ColumnFamily.java | 2 +- .../org/apache/cassandra/db/CounterCell.java | 44 ++++++----------- .../apache/cassandra/db/CounterMutation.java | 12 ++--- .../db/CounterMutationVerbHandler.java | 9 ++-- .../apache/cassandra/db/CounterUpdateCell.java | 7 +++ .../org/apache/cassandra/db/ExpiringCell.java | 6 +-- .../db/compaction/CompactionManager.java | 2 +- .../cassandra/db/compaction/Scrubber.java | 2 +- .../db/marshal/AbstractCommutativeType.java | 50 -------------------- .../cassandra/db/marshal/AbstractType.java | 8 +--- .../cassandra/db/marshal/CounterColumnType.java | 30 +++++++----- .../apache/cassandra/service/StorageProxy.java | 21 ++++---- .../cassandra/thrift/CassandraServer.java | 4 +- .../cassandra/thrift/ThriftValidation.java | 8 ++-- 20 files changed, 91 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 1f9fd8b..2f1c8fa 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -157,7 +157,7 @@ key_cache_save_period: 14400 row_cache_size_in_mb: 0 # Duration in seconds after which Cassandra should -# safe the row cache. Caches are saved to saved_caches_directory as specified +# save the row cache. Caches are saved to saved_caches_directory as specified # in this configuration file. # # Saved caches greatly improve cold-start speeds, and is relatively cheap in http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index cdc4cdb..817d4a3 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -2149,6 +2149,11 @@ public final class CFMetaData return true; } + public boolean isCounter() + { + return defaultValidator.isCounter(); + } + public void validateColumns(Iterable<Cell> columns) { for (Cell cell : columns) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 5c737d4..2ea8e38 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -50,25 +50,25 @@ public class Config public String initial_token; public Integer num_tokens = 1; - public volatile Long request_timeout_in_ms = new Long(10000); + public volatile Long request_timeout_in_ms = 10000L; - public Long read_request_timeout_in_ms = new Long(5000); + public volatile Long read_request_timeout_in_ms = 5000L; - public Long range_request_timeout_in_ms = new Long(10000); + public volatile Long range_request_timeout_in_ms = 10000L; - public Long write_request_timeout_in_ms = new Long(2000); + public volatile Long write_request_timeout_in_ms = 2000L; - public Long cas_contention_timeout_in_ms = new Long(1000); + public volatile Long cas_contention_timeout_in_ms = 1000L; - public Long truncate_request_timeout_in_ms = new Long(60000); + public volatile Long truncate_request_timeout_in_ms = 60000L; - public Integer streaming_socket_timeout_in_ms = new Integer(0); + public Integer streaming_socket_timeout_in_ms = 0; public boolean cross_node_timeout = false; public volatile Double phi_convict_threshold = 8.0; - public Integer concurrent_reads = 8; + public Integer concurrent_reads = 32; public Integer concurrent_writes = 32; public Integer concurrent_replicates = 32; @@ -161,7 +161,7 @@ public class Config public Long key_cache_size_in_mb = null; public volatile int key_cache_save_period = 14400; - public int key_cache_keys_to_save = Integer.MAX_VALUE; + public volatile int key_cache_keys_to_save = Integer.MAX_VALUE; public long row_cache_size_in_mb = 0; public volatile int row_cache_save_period = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java index ca43d20..d0478f5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java @@ -69,7 +69,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement public void validate(ClientState state) throws RequestValidationException { CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); - if (cfm.getDefaultValidator().isCommutative()) + if (cfm.isCounter()) throw new InvalidRequestException("Secondary indexes are not supported on counter tables"); ColumnDefinition cd = cfm.getColumnDefinition(target.column); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 148edda..c2a0080 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -91,7 +91,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF public boolean isCounter() { - return cfm.getDefaultValidator().isCommutative(); + return cfm.isCounter(); } public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java index 537a744..3e04f9b 100644 --- a/src/java/org/apache/cassandra/db/Cell.java +++ b/src/java/org/apache/cassandra/db/Cell.java @@ -268,15 +268,11 @@ public class Cell implements OnDiskAtom public String getString(CellNameType comparator) { - StringBuilder sb = new StringBuilder(); - sb.append(comparator.getString(name)); - sb.append(":"); - sb.append(isMarkedForDelete(System.currentTimeMillis())); - sb.append(":"); - sb.append(value.remaining()); - sb.append("@"); - sb.append(timestamp()); - return sb.toString(); + return String.format("%s:%b:%d@%d", + comparator.getString(name), + isMarkedForDelete(System.currentTimeMillis()), + value.remaining(), + timestamp); } protected void validateName(CFMetaData metadata) throws MarshalException http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 2ea60f1..9ce6b0c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -125,7 +125,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive) { - assert !metadata().getDefaultValidator().isCommutative(); + assert !metadata().isCounter(); Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata()); addColumn(cell); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java index 0a1c992..76949d4 100644 --- a/src/java/org/apache/cassandra/db/CounterCell.java +++ b/src/java/org/apache/cassandra/db/CounterCell.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; @@ -39,8 +36,6 @@ import org.apache.cassandra.utils.*; */ public class CounterCell extends Cell { - private static final Logger logger = LoggerFactory.getLogger(CounterCell.class); - protected static final CounterContext contextManager = CounterContext.instance(); private final long timestampOfLastDelete; @@ -92,10 +87,7 @@ public class CounterCell extends Cell @Override public int dataSize() { - /* - * A counter column adds to a Cell : - * + 8 bytes for timestampOfLastDelete - */ + // A counter column adds 8 bytes for timestampOfLastDelete to Cell. return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete); } @@ -157,8 +149,6 @@ public class CounterCell extends Cell @Override public Cell reconcile(Cell cell, Allocator allocator) { - assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass(); - // live + tombstone: track last tombstone if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant { @@ -175,6 +165,9 @@ public class CounterCell extends Cell // live last delete < tombstone return new CounterCell(name(), value(), timestamp(), cell.timestamp()); } + + assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass(); + // live < live last delete if (timestamp() < ((CounterCell) cell).timestampOfLastDelete()) return cell; @@ -182,11 +175,10 @@ public class CounterCell extends Cell if (timestampOfLastDelete() > cell.timestamp()) return this; // live + live: merge clocks; update value - return new CounterCell( - name(), - contextManager.merge(value(), cell.value(), allocator), - Math.max(timestamp(), cell.timestamp()), - Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete())); + return new CounterCell(name(), + contextManager.merge(value(), cell.value(), allocator), + Math.max(timestamp(), cell.timestamp()), + Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete())); } @Override @@ -199,9 +191,7 @@ public class CounterCell extends Cell @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32)); - return result; + return 31 * super.hashCode() + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32)); } @Override @@ -219,17 +209,11 @@ public class CounterCell extends Cell @Override public String getString(CellNameType comparator) { - StringBuilder sb = new StringBuilder(); - sb.append(comparator.getString(name)); - sb.append(":"); - sb.append(false); - sb.append(":"); - sb.append(contextManager.toString(value)); - sb.append("@"); - sb.append(timestamp()); - sb.append("!"); - sb.append(timestampOfLastDelete); - return sb.toString(); + return String.format("%s:false:%s@%d!%d", + comparator.getString(name), + contextManager.toString(value), + timestamp, + timestampOfLastDelete); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index a07dd9b..7dcb05c 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -78,7 +78,7 @@ public class CounterMutation implements IMutation public Mutation makeReplicationMutation() { - List<ReadCommand> readCommands = new LinkedList<ReadCommand>(); + List<ReadCommand> readCommands = new LinkedList<>(); long timestamp = System.currentTimeMillis(); for (ColumnFamily columnFamily : mutation.getColumnFamilies()) { @@ -111,7 +111,7 @@ public class CounterMutation implements IMutation public MessageOut<CounterMutation> makeMutationMessage() { - return new MessageOut<CounterMutation>(MessagingService.Verb.COUNTER_MUTATION, this, serializer); + return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer); } public boolean shouldReplicateOnWrite() @@ -133,9 +133,7 @@ public class CounterMutation implements IMutation ColumnFamily cf = cf_.cloneMeShallow(); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id()); for (Cell cell : cf_) - { cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance); - } m.add(cf); } m.apply(); @@ -145,7 +143,6 @@ public class CounterMutation implements IMutation { if (!(m instanceof CounterMutation)) throw new IllegalArgumentException(); - CounterMutation cm = (CounterMutation)m; mutation.addAll(cm.mutation); } @@ -158,10 +155,7 @@ public class CounterMutation implements IMutation public String toString(boolean shallow) { - StringBuilder buff = new StringBuilder("CounterMutation("); - buff.append(mutation.toString(shallow)); - buff.append(", ").append(consistency.toString()); - return buff.append(")").toString(); + return String.format("CounterMutation(%s, %s)", mutation.toString(shallow), consistency); } public static class CounterMutationSerializer implements IVersionedSerializer<CounterMutation> http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 966a015..d65fbd7 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -37,8 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> try { final CounterMutation cm = message.payload; - if (logger.isDebugEnabled()) - logger.debug("Applying forwarded {}", cm); + logger.debug("Applying forwarded {}", cm); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); // We should not wait for the result of the write in this thread, @@ -48,11 +47,11 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> // will not be called if the request timeout, but this is ok // because the coordinator of the counter mutation will timeout on // it's own in that case. - StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){ + StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable() + { public void run() { - WriteResponse response = new WriteResponse(); - MessagingService.instance().sendReply(response.createMessage(), id, message.from); + MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterUpdateCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java index f6bb3d4..dd2bf2a 100644 --- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java +++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.utils.Allocator; import org.apache.cassandra.utils.ByteBufferUtil; @@ -96,4 +97,10 @@ public class CounterUpdateCell extends Cell timestamp(), Long.MIN_VALUE); } + + @Override + public String getString(CellNameType comparator) + { + return String.format("%s:%s@%d", comparator.getString(name), ByteBufferUtil.toLong(value), timestamp); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/ExpiringCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java index 2b9541c..b15514e 100644 --- a/src/java/org/apache/cassandra/db/ExpiringCell.java +++ b/src/java/org/apache/cassandra/db/ExpiringCell.java @@ -147,11 +147,7 @@ public class ExpiringCell extends Cell @Override public String getString(CellNameType comparator) { - StringBuilder sb = new StringBuilder(); - sb.append(super.getString(comparator)); - sb.append("!"); - sb.append(timeToLive); - return sb.toString(); + return String.format("%s!%d", super.getString(comparator), timeToLive); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7927574..2a8d68d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -616,7 +616,7 @@ public class CompactionManager implements CompactionManagerMBean { public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer) { - if (cfs.indexManager.hasIndexes() || cfs.metadata.getDefaultValidator().isCommutative()) + if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter()) return new Full(cfs, ranges, renewer); return new Bounded(cfs, ranges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index eabfdbc..978865c 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -84,7 +84,7 @@ public class Scrubber implements Closeable this.controller = isOffline ? new ScrubController(cfs) : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs)); - this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative(); + this.isCommutative = cfs.metadata.isCounter(); this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub))); // loop through each row, deserializing to check for damage. http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java deleted file mode 100644 index 4b26d5d..0000000 --- a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.marshal; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.utils.ByteBufferUtil; - -public abstract class AbstractCommutativeType extends AbstractType<Long> -{ - public boolean isCommutative() - { - return true; - } - - @Override - public Long compose(ByteBuffer bytes) - { - return CounterContext.instance().total(bytes); - } - - @Override - public ByteBuffer decompose(Long value) - { - return ByteBufferUtil.bytes(value); - } - - /** - * create commutative column - */ - public abstract Cell createColumn(CellName name, ByteBuffer value, long timestamp); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/AbstractType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index cefa465..ce233de 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -104,12 +104,6 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> public abstract TypeSerializer<T> getSerializer(); - /** @deprecated use reverseComparator field instead */ - public Comparator<ByteBuffer> getReverseComparator() - { - return reverseComparator; - } - /* convenience method */ public String getString(Collection<ByteBuffer> names) { @@ -121,7 +115,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> return builder.toString(); } - public boolean isCommutative() + public boolean isCounter() { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java index e1a886d..73e9f6f 100644 --- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java +++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java @@ -20,18 +20,34 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import org.apache.cassandra.cql3.CQL3Type; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.serializers.CounterSerializer; import org.apache.cassandra.utils.ByteBufferUtil; -public class CounterColumnType extends AbstractCommutativeType +public class CounterColumnType extends AbstractType<Long> { public static final CounterColumnType instance = new CounterColumnType(); CounterColumnType() {} // singleton + public boolean isCounter() + { + return true; + } + + @Override + public Long compose(ByteBuffer bytes) + { + return CounterContext.instance().total(bytes); + } + + @Override + public ByteBuffer decompose(Long value) + { + return ByteBufferUtil.bytes(value); + } + public int compare(ByteBuffer o1, ByteBuffer o2) { if (o1 == null) @@ -45,14 +61,6 @@ public class CounterColumnType extends AbstractCommutativeType return ByteBufferUtil.bytesToHex(bytes); } - /** - * create commutative column - */ - public Cell createColumn(CellName name, ByteBuffer value, long timestamp) - { - return new CounterUpdateCell(name, value, timestamp); - } - public ByteBuffer fromString(String source) { return ByteBufferUtil.hexToBytes(source); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index d4bd4ff..cf8636b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -139,10 +139,9 @@ public class StorageProxy implements StorageProxyMBean Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, - ConsistencyLevel consistency_level) + ConsistencyLevel consistencyLevel) { - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); - runnable.run(); + counterWriteTask(mutation, targets, responseHandler, localDataCenter).run(); } }; @@ -152,10 +151,9 @@ public class StorageProxy implements StorageProxyMBean Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, - ConsistencyLevel consistency_level) + ConsistencyLevel consistencyLevel) { - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); - StageManager.getStage(Stage.MUTATION).execute(runnable); + StageManager.getStage(Stage.MUTATION).execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter)); } }; } @@ -1100,8 +1098,7 @@ public class StorageProxy implements StorageProxyMBean private static Runnable counterWriteTask(final IMutation mutation, final Iterable<InetAddress> targets, final AbstractWriteResponseHandler responseHandler, - final String localDataCenter, - final ConsistencyLevel consistency_level) + final String localDataCenter) { return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION) { @@ -1120,7 +1117,7 @@ public class StorageProxy implements StorageProxyMBean // then send to replicas, if any final Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress())); - if (cm.shouldReplicateOnWrite() && !remotes.isEmpty()) + if (!remotes.isEmpty() && cm.shouldReplicateOnWrite()) { // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) // and we want to avoid blocking too much the MUTATION stage @@ -2003,7 +2000,11 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { - public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException; + public void apply(IMutation mutation, + Iterable<InetAddress> targets, + AbstractWriteResponseHandler responseHandler, + String localDataCenter, + ConsistencyLevel consistencyLevel) throws OverloadedException; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 5859f92..2c2e821 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -286,7 +286,7 @@ public class CassandraServer implements Cassandra.Iface if (cf.metadata().isSuper()) { - boolean isCounterCF = cf.metadata().getDefaultValidator().isCommutative(); + boolean isCounterCF = cf.metadata().isCounter(); return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, now, subcolumnsOnly, isCounterCF); } else @@ -829,7 +829,7 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateKey(metadata, key); org.apache.cassandra.db.Mutation mutation; - if (metadata.getDefaultValidator().isCommutative()) + if (metadata.isCounter()) { ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata); counterMutation = counterMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : counterMutation; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/thrift/ThriftValidation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java index d491636..49cf39b 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java +++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java @@ -92,12 +92,12 @@ public class ThriftValidation if (isCommutativeOp) { - if (!metadata.getDefaultValidator().isCommutative()) + if (!metadata.isCounter()) throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for non commutative columnfamily " + cfName); } else { - if (metadata.getDefaultValidator().isCommutative()) + if (metadata.isCounter()) throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative columnfamily " + cfName); } return metadata; @@ -297,7 +297,7 @@ public class ThriftValidation public static void validateColumnOrSuperColumn(CFMetaData metadata, ColumnOrSuperColumn cosc) throws org.apache.cassandra.exceptions.InvalidRequestException { - boolean isCommutative = metadata.getDefaultValidator().isCommutative(); + boolean isCommutative = metadata.isCounter(); int nulls = 0; if (cosc.column == null) nulls++; @@ -405,7 +405,7 @@ public class ThriftValidation throw new org.apache.cassandra.exceptions.InvalidRequestException(msg); } - if (metadata.getDefaultValidator().isCommutative()) + if (metadata.isCounter()) { // forcing server timestamp even if a timestamp was set for coherence with other counter operation del.timestamp = System.currentTimeMillis();
