Updated Branches: refs/heads/trunk a7b72140b -> b75ed1c34
Add missing files Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b75ed1c3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b75ed1c3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b75ed1c3 Branch: refs/heads/trunk Commit: b75ed1c34b11bfbe7cbb3af72e08d5f9843c2dbe Parents: a7b7214 Author: Marcus Eriksson <[email protected]> Authored: Sat Feb 8 13:22:13 2014 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Sat Feb 8 13:22:13 2014 +0100 ---------------------------------------------------------------------- .../SSTableRepairStatusChanged.java | 15 +++ .../repair/messages/AnticompactionRequest.java | 64 ++++++++++++ .../repair/messages/PrepareMessage.java | 101 +++++++++++++++++++ .../tools/SSTableRepairedAtSetter.java | 92 +++++++++++++++++ .../serialization/2.1/db.RangeSliceCommand.bin | Bin 0 -> 801 bytes .../2.1/db.SliceByNamesReadCommand.bin | Bin 0 -> 485 bytes .../2.1/db.SliceFromReadCommand.bin | Bin 0 -> 485 bytes test/data/serialization/2.1/db.Truncation.bin | Bin 0 -> 257 bytes .../data/serialization/2.1/db.WriteResponse.bin | 0 .../serialization/2.1/gms.EndpointState.bin | Bin 0 -> 73 bytes test/data/serialization/2.1/gms.Gossip.bin | Bin 0 -> 158 bytes .../serialization/2.1/service.SyncComplete.bin | Bin 0 -> 358 bytes .../serialization/2.1/service.SyncRequest.bin | Bin 0 -> 223 bytes .../2.1/service.ValidationComplete.bin | Bin 0 -> 1381 bytes .../2.1/service.ValidationRequest.bin | Bin 0 -> 165 bytes .../serialization/2.1/utils.BloomFilter.bin | Bin 0 -> 2500016 bytes .../2.1/utils.EstimatedHistogram.bin | Bin 0 -> 97500 bytes .../db/compaction/AntiCompactionTest.java | 100 ++++++++++++++++++ 18 files changed, 372 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java new file mode 100644 index 0000000..084599e --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java @@ -0,0 +1,15 @@ +package org.apache.cassandra.notifications; + +import java.util.Collection; + +import org.apache.cassandra.io.sstable.SSTableReader; + +public class SSTableRepairStatusChanged implements INotification +{ + public final Collection<SSTableReader> sstable; + + public SSTableRepairStatusChanged(Collection<SSTableReader> repairStatusChanged) + { + this.sstable = repairStatusChanged; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java new file mode 100644 index 0000000..daa49f0 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.utils.UUIDSerializer; + +public class AnticompactionRequest extends RepairMessage +{ + public static MessageSerializer serializer = new AnticompactionRequestSerializer(); + public final UUID parentRepairSession; + + public AnticompactionRequest(UUID parentRepairSession) + { + super(Type.ANTICOMPACTION_REQUEST, null); + this.parentRepairSession = parentRepairSession; + } + + public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest> + { + public void serialize(AnticompactionRequest message, DataOutput out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); + } + + public AnticompactionRequest deserialize(DataInput in, int version) throws IOException + { + UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); + return new AnticompactionRequest(parentRepairSession); + } + + public long serializedSize(AnticompactionRequest message, int version) + { + return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java new file mode 100644 index 0000000..defb584 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.UUIDSerializer; + + +public class PrepareMessage extends RepairMessage +{ + public final static MessageSerializer serializer = new PrepareMessageSerializer(); + public final List<UUID> cfIds; + public final Collection<Range<Token>> ranges; + + public final UUID parentRepairSession; + + public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges) + { + super(Type.PREPARE_MESSAGE, null); + this.parentRepairSession = parentRepairSession; + this.cfIds = cfIds; + this.ranges = ranges; + } + + public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage> + { + public void serialize(PrepareMessage message, DataOutput out, int version) throws IOException + { + out.writeInt(message.cfIds.size()); + for (UUID cfId : message.cfIds) + UUIDSerializer.serializer.serialize(cfId, out, version); + UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); + out.writeInt(message.ranges.size()); + for (Range r : message.ranges) + Range.serializer.serialize(r, out, version); + } + + public PrepareMessage deserialize(DataInput in, int version) throws IOException + { + int cfIdCount = in.readInt(); + List<UUID> cfIds = new ArrayList<>(cfIdCount); + for (int i = 0; i < cfIdCount; i++) + cfIds.add(UUIDSerializer.serializer.deserialize(in, version)); + UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); + int rangeCount = in.readInt(); + List<Range<Token>> ranges = new ArrayList<>(rangeCount); + for (int i = 0; i < rangeCount; i++) + ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds()); + return new PrepareMessage(parentRepairSession, cfIds, ranges); + } + + public long serializedSize(PrepareMessage message, int version) + { + long size; + TypeSizes sizes = TypeSizes.NATIVE; + size = sizes.sizeof(message.cfIds.size()); + for (UUID cfId : message.cfIds) + size += UUIDSerializer.serializer.serializedSize(cfId, version); + size += UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); + size += sizes.sizeof(message.ranges.size()); + for (Range r : message.ranges) + size += Range.serializer.serializedSize(r, version); + return size; + } + } + + @Override + public String toString() + { + return "PrepareMessage{" + + "cfIds='" + cfIds + '\'' + + ", ranges=" + ranges + + ", parentRepairSession=" + parentRepairSession + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java new file mode 100644 index 0000000..623e71b --- /dev/null +++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.attribute.FileTime; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; +import org.apache.cassandra.service.ActiveRepairService; + +/** + * Set repairedAt status on a given set of sstables. + * + * If you pass --is-repaired, it will set the repairedAt time to the last modified time. + * + * If you know you ran repair 2 weeks ago, you can do something like + * + * for x in $(find /var/lib/cassandra/data/.../ -iname "*Data.db*" -mtime +14); do sstablerepairset --is-repaired $x; done + * + */ +public class SSTableRepairedAtSetter +{ + /** + * @param args a list of sstables whose metadata we are changing + */ + public static void main(String[] args) throws IOException + { + PrintStream out = System.out; + if (args.length == 0) + { + out.println("This command should be run with Cassandra stopped!"); + out.println("Usage: sstablerepairedset [--is-repaired | --is-unrepaired] <sstable>"); + System.exit(1); + } + + for (String s : args) + System.out.println(s); + + if (args.length != 3 || !args[0].equals("--really-set") || (!args[1].equals("--is-repaired") && !args[1].equals("--is-unrepaired"))) + { + out.println("This command should be run with Cassandra stopped, otherwise you will get very strange behavior"); + out.println("Verify that Cassandra is not running and then execute the command like this:"); + out.println("Usage: sstablelevelreset --really-set [--is-repaired | --is-unrepaired] <sstable>"); + System.exit(1); + } + + boolean setIsRepaired = args[1].equals("--is-repaired"); + + String fname = args[2]; + Descriptor descriptor = Descriptor.fromFilename(fname); + if (setIsRepaired) + { + FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath()); + descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, f.toMillis()); + } + else + { + descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE); + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/db.RangeSliceCommand.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/db.RangeSliceCommand.bin b/test/data/serialization/2.1/db.RangeSliceCommand.bin new file mode 100644 index 0000000..f852df0 Binary files /dev/null and b/test/data/serialization/2.1/db.RangeSliceCommand.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/db.SliceByNamesReadCommand.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/db.SliceByNamesReadCommand.bin b/test/data/serialization/2.1/db.SliceByNamesReadCommand.bin new file mode 100644 index 0000000..e9c33a2 Binary files /dev/null and b/test/data/serialization/2.1/db.SliceByNamesReadCommand.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/db.SliceFromReadCommand.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/db.SliceFromReadCommand.bin b/test/data/serialization/2.1/db.SliceFromReadCommand.bin new file mode 100644 index 0000000..1beede3 Binary files /dev/null and b/test/data/serialization/2.1/db.SliceFromReadCommand.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/db.Truncation.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/db.Truncation.bin b/test/data/serialization/2.1/db.Truncation.bin new file mode 100644 index 0000000..ea67995 Binary files /dev/null and b/test/data/serialization/2.1/db.Truncation.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/db.WriteResponse.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/db.WriteResponse.bin b/test/data/serialization/2.1/db.WriteResponse.bin new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/gms.EndpointState.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/gms.EndpointState.bin b/test/data/serialization/2.1/gms.EndpointState.bin new file mode 100644 index 0000000..f87fc77 Binary files /dev/null and b/test/data/serialization/2.1/gms.EndpointState.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/gms.Gossip.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/gms.Gossip.bin b/test/data/serialization/2.1/gms.Gossip.bin new file mode 100644 index 0000000..af5ac57 Binary files /dev/null and b/test/data/serialization/2.1/gms.Gossip.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/service.SyncComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/service.SyncComplete.bin b/test/data/serialization/2.1/service.SyncComplete.bin new file mode 100644 index 0000000..533abe2 Binary files /dev/null and b/test/data/serialization/2.1/service.SyncComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/service.SyncRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/service.SyncRequest.bin b/test/data/serialization/2.1/service.SyncRequest.bin new file mode 100644 index 0000000..2bb8bf9 Binary files /dev/null and b/test/data/serialization/2.1/service.SyncRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/service.ValidationComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/service.ValidationComplete.bin b/test/data/serialization/2.1/service.ValidationComplete.bin new file mode 100644 index 0000000..6eff48f Binary files /dev/null and b/test/data/serialization/2.1/service.ValidationComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/service.ValidationRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/service.ValidationRequest.bin b/test/data/serialization/2.1/service.ValidationRequest.bin new file mode 100644 index 0000000..e774d05 Binary files /dev/null and b/test/data/serialization/2.1/service.ValidationRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/utils.BloomFilter.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/utils.BloomFilter.bin b/test/data/serialization/2.1/utils.BloomFilter.bin new file mode 100644 index 0000000..357042a Binary files /dev/null and b/test/data/serialization/2.1/utils.BloomFilter.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/data/serialization/2.1/utils.EstimatedHistogram.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.1/utils.EstimatedHistogram.bin b/test/data/serialization/2.1/utils.EstimatedHistogram.bin new file mode 100644 index 0000000..bedd39b Binary files /dev/null and b/test/data/serialization/2.1/utils.EstimatedHistogram.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b75ed1c3/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java new file mode 100644 index 0000000..164dd62 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.dht.BytesToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableScanner; +import org.apache.cassandra.utils.ByteBufferUtil; +import static junit.framework.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AntiCompactionTest extends SchemaLoader +{ + private static final String KEYSPACE1 = "Keyspace1"; + private static final String CF = "Standard1"; + + @Test + public void antiCompactOne() throws InterruptedException, ExecutionException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + Mutation rm = new Mutation(KEYSPACE1, key.key); + for (int j = 0; j < 10; j++) + rm.add("Standard1", Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + 0); + rm.apply(); + } + store.forceBlockingFlush(); + Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), sstables.size()); + Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + long repairedAt = 1000; + CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt); + + assertEquals(2, store.getSSTables().size()); + int repairedKeys = 0; + int nonRepairedKeys = 0; + for (SSTableReader sstable : store.getSSTables()) + { + SSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (sstable.isRepaired()) + { + assertTrue(range.contains(row.getKey().token)); + repairedKeys++; + } + else + { + assertFalse(range.contains(row.getKey().token)); + nonRepairedKeys++; + } + } + } + assertEquals(repairedKeys, 4); + assertEquals(nonRepairedKeys, 6); + } +}
