Repository: cassandra Updated Branches: refs/heads/trunk f4e8fc3f6 -> 0015f37a3
Track presence of legacy counter shards in sstables patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for CASSANDRA-6888 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57b18e60 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57b18e60 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57b18e60 Branch: refs/heads/trunk Commit: 57b18e600c6d79d19d29f3569b81cb946ef9ee57 Parents: 6d901f9 Author: Aleksey Yeschenko <[email protected]> Authored: Fri Apr 4 17:36:15 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Apr 4 17:36:15 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamily.java | 12 ++- .../org/apache/cassandra/db/CounterCell.java | 5 ++ .../db/compaction/LazilyCompactedRow.java | 12 +-- .../cassandra/db/context/CounterContext.java | 18 +++++ .../cassandra/io/sstable/ColumnStats.java | 12 ++- .../apache/cassandra/io/sstable/Descriptor.java | 3 + .../cassandra/io/sstable/SSTableWriter.java | 26 ++++--- .../metadata/LegacyMetadataSerializer.java | 1 + .../io/sstable/metadata/MetadataCollector.java | 67 ++++++++++------- .../io/sstable/metadata/StatsMetadata.java | 14 ++++ .../io/sstable/SSTableMetadataTest.java | 77 +++++++++++++++++--- 12 files changed, 194 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ac2f624..4cfc957 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -40,6 +40,7 @@ * Optimize CounterColumn#reconcile() (CASSANDRA-6953) * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869) * Lock counter cells, not partitions (CASSANDRA-6880) + * Track presence of legacy counter shards in sstables (CASSANDRA-6888) Merged from 2.0: * Allow compaction of system tables during startup (CASSANDRA-6913) * Restrict Windows to parallel repairs (CASSANDRA-6907) http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index e7aab37..da404b0 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -402,6 +402,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry int maxLocalDeletionTime = Integer.MIN_VALUE; List<ByteBuffer> minColumnNamesSeen = Collections.emptyList(); List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList(); + boolean hasLegacyCounterShards = false; for (Cell cell : this) { if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) @@ -420,8 +421,17 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry tombstones.update(deletionTime); minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator); maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator); + if (cell instanceof CounterCell) + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards(); } - return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones, minColumnNamesSeen, maxColumnNamesSeen); + return new ColumnStats(getColumnCount(), + minTimestampSeen, + maxTimestampSeen, + maxLocalDeletionTime, + tombstones, + minColumnNamesSeen, + maxColumnNamesSeen, + hasLegacyCounterShards); } public boolean isMarkedForDelete() http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/CounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java index 6b588ef..fc4ac3f 100644 --- a/src/java/org/apache/cassandra/db/CounterCell.java +++ b/src/java/org/apache/cassandra/db/CounterCell.java @@ -182,6 +182,11 @@ public class CounterCell extends Cell Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete())); } + public boolean hasLegacyShards() + { + return contextManager.hasLegacyShards(value); + } + @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 12a9308..2fefe0d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.compaction; -import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -125,8 +124,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow reducer.maxLocalDeletionTimeSeen, reducer.tombstones, reducer.minColumnNameSeen, - reducer.maxColumnNameSeen - ); + reducer.maxColumnNameSeen, + reducer.hasLegacyCounterShards); // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone indexBuilder.maybeWriteEmptyRowHeader(); @@ -202,6 +201,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); List<ByteBuffer> minColumnNameSeen = Collections.emptyList(); List<ByteBuffer> maxColumnNameSeen = Collections.emptyList(); + boolean hasLegacyCounterShards = false; /** * Called once per version of a cell that we need to merge, after which getReduced() is called. In other words, @@ -293,9 +293,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow int deletionTime = reduced.getLocalDeletionTime(); if (deletionTime < Integer.MAX_VALUE) - { tombstones.update(deletionTime); - } + + if (reduced instanceof CounterCell) + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) reduced).hasLegacyShards(); + return reduced; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/context/CounterContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java index 0b1677b..455ffc7 100644 --- a/src/java/org/apache/cassandra/db/context/CounterContext.java +++ b/src/java/org/apache/cassandra/db/context/CounterContext.java @@ -546,6 +546,24 @@ public class CounterContext } /** + * Detects whether or not the context has any legacy (local or remote) shards in it. + */ + public boolean hasLegacyShards(ByteBuffer context) + { + int totalCount = (context.remaining() - headerLength(context)) / STEP_LENGTH; + int localAndGlobalCount = Math.abs(context.getShort(context.position())); + + if (localAndGlobalCount < totalCount) + return true; // remote shard(s) present + + for (int i = 0; i < localAndGlobalCount; i++) + if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0) + return true; // found a local shard + + return false; + } + + /** * Mark context to delete local references afterward. * Marking is done by multiply #elt by -1 to preserve header length * and #elt count in order to clear all local refs later. http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/ColumnStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java index bd3bd1c..d09f965 100644 --- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java +++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java @@ -41,7 +41,16 @@ public class ColumnStats public final List<ByteBuffer> minColumnNames; public final List<ByteBuffer> maxColumnNames; - public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime, StreamingHistogram tombstoneHistogram, List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames) + public final boolean hasLegacyCounterShards; + + public ColumnStats(int columnCount, + long minTimestamp, + long maxTimestamp, + int maxLocalDeletionTime, + StreamingHistogram tombstoneHistogram, + List<ByteBuffer> minColumnNames, + List<ByteBuffer> maxColumnNames, + boolean hasLegacyCounterShards) { this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; @@ -50,5 +59,6 @@ public class ColumnStats this.tombstoneHistogram = tombstoneHistogram; this.minColumnNames = minColumnNames; this.maxColumnNames = maxColumnNames; + this.hasLegacyCounterShards = hasLegacyCounterShards; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index db6f13a..18609bf 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -62,6 +62,7 @@ public class Descriptor // ka (2.1.0): new Statistics.db file format // index summaries can be downsampled and the sampling level is persisted // switch uncompressed checksums to adler32 + // tracks presense of legacy (local and remote) counter shards public static final Version CURRENT = new Version(current_version); @@ -73,6 +74,7 @@ public class Descriptor public final boolean newStatsFile; public final boolean hasAllAdlerChecksums; public final boolean hasRepairedAt; + public final boolean tracksLegacyCounterShards; public Version(String version) { @@ -83,6 +85,7 @@ public class Descriptor newStatsFile = version.compareTo("ka") >= 0; hasAllAdlerChecksums = version.compareTo("ka") >= 0; hasRepairedAt = version.compareTo("ka") >= 0; + tracksLegacyCounterShards = version.compareTo("ka") >= 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 1dc2c98..4a7729e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -227,8 +227,9 @@ public class SSTableWriter extends SSTable List<ByteBuffer> minColumnNames = Collections.emptyList(); List<ByteBuffer> maxColumnNames = Collections.emptyList(); StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + boolean hasLegacyCounterShards = false; + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); cf.delete(DeletionTime.serializer.deserialize(in)); ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream); @@ -253,14 +254,16 @@ public class SSTableWriter extends SSTable OnDiskAtom atom = iter.next(); if (atom == null) break; + if (atom instanceof CounterCell) + { atom = ((CounterCell) atom).markLocalToBeCleared(); + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); + } int deletionTime = atom.getLocalDeletionTime(); if (deletionTime < Integer.MAX_VALUE) - { tombstones.update(deletionTime); - } minTimestamp = Math.min(minTimestamp, atom.timestamp()); maxTimestamp = Math.max(maxTimestamp, atom.timestamp()); minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); @@ -278,14 +281,15 @@ public class SSTableWriter extends SSTable throw new FSWriteError(e, dataFile.getPath()); } - sstableMetadataCollector.updateMinTimestamp(minTimestamp); - sstableMetadataCollector.updateMaxTimestamp(maxTimestamp); - sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime); - sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); - sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount()); - sstableMetadataCollector.mergeTombstoneHistogram(tombstones); - sstableMetadataCollector.updateMinColumnNames(minColumnNames); - sstableMetadataCollector.updateMaxColumnNames(maxColumnNames); + sstableMetadataCollector.updateMinTimestamp(minTimestamp) + .updateMaxTimestamp(maxTimestamp) + .updateMaxLocalDeletionTime(maxLocalDeletionTime) + .addRowSize(dataFile.getFilePointer() - currentPosition) + .addColumnCount(columnIndexer.writtenAtomCount()) + .mergeTombstoneHistogram(tombstones) + .updateMinColumnNames(minColumnNames) + .updateMaxColumnNames(maxColumnNames) + .updateHasLegacyCounterShards(hasLegacyCounterShards); afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); return currentPosition; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 59f7be5..4bd060e 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@ -133,6 +133,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer sstableLevel, minColumnNames, maxColumnNames, + true, ActiveRepairService.UNREPAIRED_SSTABLE)); if (types.contains(MetadataType.COMPACTION)) components.put(MetadataType.COMPACTION, http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 84c35c7..84789a6 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -21,10 +21,10 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.*; -import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; -import com.clearspring.analytics.stream.cardinality.ICardinality; import com.google.common.collect.Maps; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.io.sstable.*; @@ -67,6 +67,7 @@ public class MetadataCollector 0, Collections.<ByteBuffer>emptyList(), Collections.<ByteBuffer>emptyList(), + true, ActiveRepairService.UNREPAIRED_SSTABLE); } @@ -82,6 +83,8 @@ public class MetadataCollector protected int sstableLevel; protected List<ByteBuffer> minColumnNames = Collections.emptyList(); protected List<ByteBuffer> maxColumnNames = Collections.emptyList(); + protected boolean hasLegacyCounterShards = false; + /** * Default cardinality estimation method is to use HyperLogLog++. * Parameter here(p=13, sp=25) should give reasonable estimation @@ -108,56 +111,62 @@ public class MetadataCollector { addAncestor(sstable.descriptor.generation); for (Integer i : sstable.getAncestors()) - { if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) addAncestor(i); - } } } - public void addKey(ByteBuffer key) + public MetadataCollector addKey(ByteBuffer key) { long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0); cardinality.offerHashed(hashed); + return this; } - public void addRowSize(long rowSize) + public MetadataCollector addRowSize(long rowSize) { estimatedRowSize.add(rowSize); + return this; } - public void addColumnCount(long columnCount) + public MetadataCollector addColumnCount(long columnCount) { estimatedColumnCount.add(columnCount); + return this; } - public void mergeTombstoneHistogram(StreamingHistogram histogram) + public MetadataCollector mergeTombstoneHistogram(StreamingHistogram histogram) { estimatedTombstoneDropTime.merge(histogram); + return this; } /** * Ratio is compressed/uncompressed and it is * if you have 1.x then compression isn't helping */ - public void addCompressionRatio(long compressed, long uncompressed) + public MetadataCollector addCompressionRatio(long compressed, long uncompressed) { compressionRatio = (double) compressed/uncompressed; + return this; } - public void updateMinTimestamp(long potentialMin) + public MetadataCollector updateMinTimestamp(long potentialMin) { minTimestamp = Math.min(minTimestamp, potentialMin); + return this; } - public void updateMaxTimestamp(long potentialMax) + public MetadataCollector updateMaxTimestamp(long potentialMax) { maxTimestamp = Math.max(maxTimestamp, potentialMax); + return this; } - public void updateMaxLocalDeletionTime(int maxLocalDeletionTime) + public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime) { this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime); + return this; } public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize) @@ -184,18 +193,6 @@ public class MetadataCollector return this; } - public void update(long size, ColumnStats stats) - { - updateMinTimestamp(stats.minTimestamp); - updateMaxTimestamp(stats.maxTimestamp); - updateMaxLocalDeletionTime(stats.maxLocalDeletionTime); - addRowSize(size); - addColumnCount(stats.columnCount); - mergeTombstoneHistogram(stats.tombstoneHistogram); - updateMinColumnNames(stats.minColumnNames); - updateMaxColumnNames(stats.maxColumnNames); - } - public MetadataCollector sstableLevel(int sstableLevel) { this.sstableLevel = sstableLevel; @@ -216,6 +213,26 @@ public class MetadataCollector return this; } + public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards) + { + this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; + return this; + } + + public MetadataCollector update(long rowSize, ColumnStats stats) + { + updateMinTimestamp(stats.minTimestamp); + updateMaxTimestamp(stats.maxTimestamp); + updateMaxLocalDeletionTime(stats.maxLocalDeletionTime); + addRowSize(rowSize); + addColumnCount(stats.columnCount); + mergeTombstoneHistogram(stats.tombstoneHistogram); + updateMinColumnNames(stats.minColumnNames); + updateMaxColumnNames(stats.maxColumnNames); + updateHasLegacyCounterShards(stats.hasLegacyCounterShards); + return this; + } + public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt) { Map<MetadataType, MetadataComponent> components = Maps.newHashMap(); @@ -231,9 +248,9 @@ public class MetadataCollector sstableLevel, minColumnNames, maxColumnNames, + hasLegacyCounterShards, repairedAt)); components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality)); return components; } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 1c3dfd5..900bd4e 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -52,6 +52,7 @@ public class StatsMetadata extends MetadataComponent public final int sstableLevel; public final List<ByteBuffer> maxColumnNames; public final List<ByteBuffer> minColumnNames; + public final boolean hasLegacyCounterShards; public final long repairedAt; public StatsMetadata(EstimatedHistogram estimatedRowSize, @@ -65,6 +66,7 @@ public class StatsMetadata extends MetadataComponent int sstableLevel, List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, + boolean hasLegacyCounterShards, long repairedAt) { this.estimatedRowSize = estimatedRowSize; @@ -78,6 +80,7 @@ public class StatsMetadata extends MetadataComponent this.sstableLevel = sstableLevel; this.minColumnNames = minColumnNames; this.maxColumnNames = maxColumnNames; + this.hasLegacyCounterShards = hasLegacyCounterShards; this.repairedAt = repairedAt; } @@ -123,6 +126,7 @@ public class StatsMetadata extends MetadataComponent newLevel, maxColumnNames, minColumnNames, + hasLegacyCounterShards, repairedAt); } @@ -139,6 +143,7 @@ public class StatsMetadata extends MetadataComponent sstableLevel, maxColumnNames, minColumnNames, + hasLegacyCounterShards, newRepairedAt); } @@ -162,6 +167,7 @@ public class StatsMetadata extends MetadataComponent .append(repairedAt, that.repairedAt) .append(maxColumnNames, that.maxColumnNames) .append(minColumnNames, that.minColumnNames) + .append(hasLegacyCounterShards, that.hasLegacyCounterShards) .build(); } @@ -181,6 +187,7 @@ public class StatsMetadata extends MetadataComponent .append(repairedAt) .append(maxColumnNames) .append(minColumnNames) + .append(hasLegacyCounterShards) .build(); } @@ -203,6 +210,7 @@ public class StatsMetadata extends MetadataComponent size += 4; for (ByteBuffer columnName : component.maxColumnNames) size += 2 + columnName.remaining(); // with short length + size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards); return size; } @@ -224,6 +232,7 @@ public class StatsMetadata extends MetadataComponent out.writeInt(component.maxColumnNames.size()); for (ByteBuffer columnName : component.maxColumnNames) ByteBufferUtil.writeWithShortLength(columnName, out); + out.writeBoolean(component.hasLegacyCounterShards); } public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException @@ -251,6 +260,10 @@ public class StatsMetadata extends MetadataComponent for (int i = 0; i < colCount; i++) maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); + boolean hasLegacyCounterShards = true; + if (version.tracksLegacyCounterShards) + hasLegacyCounterShards = in.readBoolean(); + return new StatsMetadata(rowSizes, columnCounts, replayPosition, @@ -262,6 +275,7 @@ public class StatsMetadata extends MetadataComponent sstableLevel, minColumnNames, maxColumnNames, + hasLegacyCounterShards, repairedAt); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java index 78f248b..1624a6b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.io.sstable; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,7 +18,7 @@ package org.apache.cassandra.io.sstable; * under the License. * */ - +package org.apache.cassandra.io.sstable; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; @@ -32,11 +31,16 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.Util.cellname; + public class SSTableMetadataTest extends SchemaLoader { @Test @@ -50,14 +54,14 @@ public class SSTableMetadataTest extends SchemaLoader DecoratedKey key = Util.dk(Integer.toString(i)); Mutation rm = new Mutation("Keyspace1", key.key); for (int j = 0; j < 10; j++) - rm.add("Standard1", Util.cellname(Integer.toString(j)), + rm.add("Standard1", cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 10 + j); rm.apply(); } Mutation rm = new Mutation("Keyspace1", Util.dk("longttl").key); - rm.add("Standard1", Util.cellname("col"), + rm.add("Standard1", cellname("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 10000); @@ -73,7 +77,7 @@ public class SSTableMetadataTest extends SchemaLoader } rm = new Mutation("Keyspace1", Util.dk("longttl2").key); - rm.add("Standard1", Util.cellname("col"), + rm.add("Standard1", cellname("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 20000); @@ -81,7 +85,7 @@ public class SSTableMetadataTest extends SchemaLoader ttltimestamp = (int) (System.currentTimeMillis()/1000); store.forceBlockingFlush(); assertEquals(2, store.getSSTables().size()); - List<SSTableReader> sstables = new ArrayList<SSTableReader>(store.getSSTables()); + List<SSTableReader> sstables = new ArrayList<>(store.getSSTables()); if(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime < sstables.get(1).getSSTableMetadata().maxLocalDeletionTime) { assertEquals(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime, firstDelTime); @@ -121,11 +125,11 @@ public class SSTableMetadataTest extends SchemaLoader DecoratedKey key = Util.dk("deletetest"); Mutation rm = new Mutation("Keyspace1", key.key); for (int i = 0; i<5; i++) - rm.add("Standard2", Util.cellname("deletecolumn"+i), + rm.add("Standard2", cellname("deletecolumn" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 100); - rm.add("Standard2", Util.cellname("todelete"), + rm.add("Standard2", cellname("todelete"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000); @@ -140,7 +144,7 @@ public class SSTableMetadataTest extends SchemaLoader assertEquals(ttltimestamp + 1000, firstMaxDelTime, 10); } rm = new Mutation("Keyspace1", key.key); - rm.delete("Standard2", Util.cellname("todelete"), timestamp + 1); + rm.delete("Standard2", cellname("todelete"), timestamp + 1); rm.apply(); store.forceBlockingFlush(); assertEquals(2,store.getSSTables().size()); @@ -174,7 +178,7 @@ public class SSTableMetadataTest extends SchemaLoader Mutation rm = new Mutation("Keyspace1", key.key); for (int i = 100; i<150; i++) { - rm.add("Standard3", Util.cellname(j+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); + rm.add("Standard3", cellname(j + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); } rm.apply(); } @@ -189,7 +193,7 @@ public class SSTableMetadataTest extends SchemaLoader Mutation rm = new Mutation("Keyspace1", key.key); for (int i = 101; i<299; i++) { - rm.add("Standard3", Util.cellname(9+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); + rm.add("Standard3", cellname(9 + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); } rm.apply(); @@ -202,6 +206,7 @@ public class SSTableMetadataTest extends SchemaLoader assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)), "9col298"); } } + @Test public void testMaxMinComposites() throws CharacterCodingException, ExecutionException, InterruptedException { @@ -251,4 +256,54 @@ public class SSTableMetadataTest extends SchemaLoader assertEquals(0, ByteBufferUtil.toInt(sstable.getSSTableMetadata().minColumnNames.get(1))); } } + + @Test + public void testLegacyCounterShardTracking() + { + ColumnFamilyStore cfs = Keyspace.open("Keyspace1").getColumnFamilyStore("Counter1"); + + // A cell with all shards + CounterContext.ContextState state = CounterContext.ContextState.allocate(1, 1, 1); + state.writeGlobal(CounterId.fromInt(1), 1L, 1L); + state.writeLocal(CounterId.fromInt(2), 1L, 1L); + state.writeRemote(CounterId.fromInt(3), 1L, 1L); + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE)); + new Mutation(Util.dk("k").key, cells).apply(); + cfs.forceBlockingFlush(); + assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards); + cfs.truncateBlocking(); + + // A cell with global and remote shards + state = CounterContext.ContextState.allocate(0, 1, 1); + state.writeLocal(CounterId.fromInt(2), 1L, 1L); + state.writeRemote(CounterId.fromInt(3), 1L, 1L); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE)); + new Mutation(Util.dk("k").key, cells).apply(); + cfs.forceBlockingFlush(); + assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards); + cfs.truncateBlocking(); + + // A cell with global and local shards + state = CounterContext.ContextState.allocate(1, 1, 0); + state.writeGlobal(CounterId.fromInt(1), 1L, 1L); + state.writeLocal(CounterId.fromInt(2), 1L, 1L); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE)); + new Mutation(Util.dk("k").key, cells).apply(); + cfs.forceBlockingFlush(); + assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards); + cfs.truncateBlocking(); + + // A cell with global only + state = CounterContext.ContextState.allocate(1, 0, 0); + state.writeGlobal(CounterId.fromInt(1), 1L, 1L); + cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE)); + new Mutation(Util.dk("k").key, cells).apply(); + cfs.forceBlockingFlush(); + assertFalse(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards); + cfs.truncateBlocking(); + } }
