Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 811ee6ca2 -> 96d41f0e0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintMessageTest.java b/test/unit/org/apache/cassandra/hints/HintMessageTest.java new file mode 100644 index 0000000..7ffaa54 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintMessageTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.IOException; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; + +import static junit.framework.Assert.assertEquals; + +import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class HintMessageTest +{ + private static final String KEYSPACE = "hint_message_test"; + private static final String TABLE = "table"; + + @Test + public void testSerializer() throws IOException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + + UUID hostId = UUID.randomUUID(); + long now = FBUtilities.timestampMicros(); + + CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + Mutation mutation = + new RowUpdateBuilder(table, now, bytes("key")) + .clustering("column") + .add("val", "val" + 1234) + .build(); + Hint hint = Hint.create(mutation, now / 1000); + HintMessage message = new HintMessage(hostId, hint); + + // serialize + int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version); + DataOutputBuffer dob = new DataOutputBuffer(); + HintMessage.serializer.serialize(message, dob, MessagingService.current_version); + assertEquals(serializedSize, dob.getLength()); + + // deserialize + DataInputPlus di = new DataInputBuffer(dob.buffer(), true); + HintMessage deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version); + + // compare before/after + assertEquals(hostId, deserializedMessage.hostId); + assertHintsEqual(message.hint, deserializedMessage.hint); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java new file mode 100644 index 0000000..c198149 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.utils.FBUtilities; + +import static junit.framework.Assert.*; + +import static org.apache.cassandra.Util.dk; +import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual; +import static org.apache.cassandra.hints.HintsTestUtil.assertPartitionsEqual; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class HintTest +{ + private static final String KEYSPACE = "hint_test"; + private static final String TABLE0 = "table_0"; + private static final String TABLE1 = "table_1"; + private static final String TABLE2 = "table_2"; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE0), + SchemaLoader.standardCFMD(KEYSPACE, TABLE1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE2)); + } + + @Before + public void resetGcGraceSeconds() + { + for (CFMetaData table : Schema.instance.getTables(KEYSPACE)) + table.gcGraceSeconds(TableParams.DEFAULT_GC_GRACE_SECONDS); + } + + @Test + public void testSerializer() throws IOException + { + long now = FBUtilities.timestampMicros(); + Mutation mutation = createMutation("testSerializer", now); + Hint hint = Hint.create(mutation, now / 1000); + + // serialize + int serializedSize = (int) Hint.serializer.serializedSize(hint, MessagingService.current_version); + DataOutputBuffer dob = new DataOutputBuffer(); + Hint.serializer.serialize(hint, dob, MessagingService.current_version); + assertEquals(serializedSize, dob.getLength()); + + // deserialize + DataInputPlus di = new DataInputBuffer(dob.buffer(), true); + Hint deserializedHint = Hint.serializer.deserialize(di, MessagingService.current_version); + + // compare before/after + assertHintsEqual(hint, deserializedHint); + } + + @Test + public void testApply() + { + long now = FBUtilities.timestampMicros(); + String key = "testApply"; + Mutation mutation = createMutation(key, now); + Hint hint = Hint.create(mutation, now / 1000); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + hint.apply(); + + // assert that we can read the inserted partitions + for (PartitionUpdate partition : mutation.getPartitionUpdates()) + assertPartitionsEqual(partition, readPartition(key, partition.metadata().cfName)); + } + + @Test + public void testApplyWithTruncation() + { + long now = FBUtilities.timestampMicros(); + String key = "testApplyWithTruncation"; + Mutation mutation = createMutation(key, now); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + // truncate TABLE1 + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE1).truncateBlocking(); + + // create and apply a hint with creation time in the past (one second before the truncation) + Hint.create(mutation, now / 1000 - 1).apply(); + + // TABLE1 update should have been skipped and not applied, as expired + assertNoPartitions(key, TABLE1); + + // TABLE0 and TABLE2 updates should have been applied successfully + assertPartitionsEqual(mutation.getPartitionUpdate(Schema.instance.getId(KEYSPACE, TABLE0)), readPartition(key, TABLE0)); + assertPartitionsEqual(mutation.getPartitionUpdate(Schema.instance.getId(KEYSPACE, TABLE2)), readPartition(key, TABLE2)); + } + + @Test + public void testApplyWithRegularExpiration() + { + long now = FBUtilities.timestampMicros(); + String key = "testApplyWithRegularExpiration"; + Mutation mutation = createMutation(key, now); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + // lower the GC GS on TABLE0 to 0 BEFORE the hint is created + Schema.instance.getCFMetaData(KEYSPACE, TABLE0).gcGraceSeconds(0); + + Hint.create(mutation, now / 1000).apply(); + + // all updates should have been skipped and not applied, as expired + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + } + + @Test + public void testApplyWithGCGSReducedLater() + { + long now = FBUtilities.timestampMicros(); + String key = "testApplyWithGCGSReducedLater"; + Mutation mutation = createMutation(key, now); + Hint hint = Hint.create(mutation, now / 1000); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + // lower the GC GS on TABLE0 AFTER the hint is already created + Schema.instance.getCFMetaData(KEYSPACE, TABLE0).gcGraceSeconds(0); + + hint.apply(); + + // all updates should have been skipped and not applied, as expired + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + } + + private static Mutation createMutation(String key, long now) + { + Mutation mutation = new Mutation(KEYSPACE, dk(key)); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE0), now, mutation) + .clustering("column0") + .add("val", "value0") + .build(); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE1), now + 1, mutation) + .clustering("column1") + .add("val", "value1") + .build(); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE2), now + 2, mutation) + .clustering("column2") + .add("val", "value2") + .build(); + + return mutation; + } + + private static SinglePartitionReadCommand cmd(String key, String table) + { + CFMetaData meta = Schema.instance.getCFMetaData(KEYSPACE, table); + return SinglePartitionReadCommand.fullPartitionRead(meta, FBUtilities.nowInSeconds(), bytes(key)); + } + + private static FilteredPartition readPartition(String key, String table) + { + return Util.getOnlyPartition(cmd(key, table)); + } + + private static void assertNoPartitions(String key, String table) + { + ReadCommand cmd = cmd(key, table); + + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + PartitionIterator iterator = cmd.executeInternal(orderGroup)) + { + assertFalse(iterator.hasNext()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java new file mode 100644 index 0000000..ebc333a --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; + +import com.google.common.collect.Iterables; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; + +import static junit.framework.Assert.*; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.apache.cassandra.utils.FBUtilities.updateChecksum; + +public class HintsBufferTest +{ + private static final String KEYSPACE = "hints_buffer_test"; + private static final String TABLE = "table"; + + private static final int HINTS_COUNT = 300_000; + private static final int HINT_THREADS_COUNT = 10; + private static final int HOST_ID_COUNT = 10; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + } + + @Test + @SuppressWarnings("resource") + public void testOverlyLargeAllocation() + { + // create a small, 128 bytes buffer + HintsBuffer buffer = HintsBuffer.create(128); + + // try allocating an entry of 65 bytes (53 bytes hint + 12 bytes of overhead) + try + { + buffer.allocate(65 - HintsBuffer.ENTRY_OVERHEAD_SIZE); + fail("Allocation of the buffer should have failed but hasn't"); + } + catch (IllegalArgumentException e) + { + assertEquals(String.format("Hint of %s bytes is too large - the maximum size is 64", 65 - HintsBuffer.ENTRY_OVERHEAD_SIZE), + e.getMessage()); + } + + // assert that a 1-byte smaller allocation fits properly + try (HintsBuffer.Allocation allocation = buffer.allocate(64 - HintsBuffer.ENTRY_OVERHEAD_SIZE)) + { + assertNotNull(allocation); + } + } + + @Test + public void testWrite() throws IOException, InterruptedException + { + // generate 10 random host ids to choose from + UUID[] hostIds = new UUID[HOST_ID_COUNT]; + for (int i = 0; i < hostIds.length; i++) + hostIds[i] = UUID.randomUUID(); + + // map each index to one random UUID from the previously created UUID array + Random random = new Random(System.currentTimeMillis()); + UUID[] load = new UUID[HINTS_COUNT]; + for (int i = 0; i < load.length; i++) + load[i] = hostIds[random.nextInt(HOST_ID_COUNT)]; + + // calculate the size of a single hint (they will all have an equal size in this test) + int hintSize = (int) Hint.serializer.serializedSize(createHint(0, System.currentTimeMillis()), MessagingService.current_version); + int entrySize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE; + + // allocate a slab to fit *precisely* HINTS_COUNT hints + int slabSize = entrySize * HINTS_COUNT; + HintsBuffer buffer = HintsBuffer.create(slabSize); + + // use a fixed timestamp base for all mutation timestamps + long baseTimestamp = System.currentTimeMillis(); + + // create HINT_THREADS_COUNT, start them, and wait for them to finish + List<Thread> threads = new ArrayList<>(HINT_THREADS_COUNT); + for (int i = 0; i < HINT_THREADS_COUNT; i ++) + threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp))); + threads.forEach(java.lang.Thread::start); + for (Thread thread : threads) + thread.join(); + + // sanity check that we are full + assertEquals(slabSize, buffer.capacity()); + assertEquals(0, buffer.remaining()); + + // try to allocate more bytes, ensure that the allocation fails + assertNull(buffer.allocate(1)); + + // a failed allocation should automatically close the oporder + buffer.waitForModifications(); + + // a failed allocation should also automatically make the buffer as closed + assertTrue(buffer.isClosed()); + + // assert that host id set in the buffer equals to hostIds + assertEquals(HOST_ID_COUNT, buffer.hostIds().size()); + assertEquals(new HashSet<>(Arrays.asList(hostIds)), buffer.hostIds()); + + // iterate over *every written hint*, validate its content + for (UUID hostId : hostIds) + { + Iterator<ByteBuffer> iter = buffer.consumingHintsIterator(hostId); + while (iter.hasNext()) + { + int idx = validateEntry(hostId, iter.next(), baseTimestamp, load); + load[idx] = null; // nullify each visited entry + } + } + + // assert that all the entries in load array have been visited and nullified + for (UUID hostId : load) + assertNull(hostId); + + // free the buffer + buffer.free(); + } + + private static int validateEntry(UUID hostId, ByteBuffer buffer, long baseTimestamp, UUID[] load) throws IOException + { + CRC32 crc = new CRC32(); + DataInputPlus di = new DataInputBuffer(buffer, true); + + // read and validate size + int hintSize = di.readInt(); + assertEquals(hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE, buffer.remaining()); + + // read and validate size crc + updateChecksum(crc, buffer, buffer.position(), 4); + assertEquals((int) crc.getValue(), di.readInt()); + + // read the hint and update/validate overall crc + Hint hint = Hint.serializer.deserialize(di, MessagingService.current_version); + updateChecksum(crc, buffer, buffer.position() + 8, hintSize); + assertEquals((int) crc.getValue(), di.readInt()); + + // further validate hint correctness + int idx = (int) (hint.creationTime - baseTimestamp); + assertEquals(hostId, load[idx]); + + Row row = hint.mutation.getPartitionUpdates().iterator().next().iterator().next(); + assertEquals(1, Iterables.size(row.cells())); + + assertEquals(bytes(idx), row.clustering().get(0)); + Cell cell = row.cells().iterator().next(); + assertEquals(TimeUnit.MILLISECONDS.toMicros(baseTimestamp + idx), cell.timestamp()); + assertEquals(bytes(idx), cell.value()); + + return idx; + } + + private static Hint createHint(int idx, long baseTimestamp) + { + long timestamp = baseTimestamp + idx; + return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp); + } + + private static Mutation createMutation(int index, long timestamp) + { + CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + return new RowUpdateBuilder(table, timestamp, bytes(index)) + .clustering(bytes(index)) + .add("val", bytes(index)) + .build(); + } + + static class Writer implements Runnable + { + final HintsBuffer buffer; + final UUID[] load; + final int hintSize; + final int index; + final long baseTimestamp; + + Writer(HintsBuffer buffer, UUID[] load, int hintSize, int index, long baseTimestamp) + { + this.buffer = buffer; + this.load = load; + this.hintSize = hintSize; + this.index = index; + this.baseTimestamp = baseTimestamp; + } + + public void run() + { + int hintsPerThread = HINTS_COUNT / HINT_THREADS_COUNT; + for (int i = index * hintsPerThread; i < (index + 1) * hintsPerThread; i++) + { + try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize)) + { + Hint hint = createHint(i, baseTimestamp); + allocation.write(Collections.singleton(load[i]), hint); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java new file mode 100644 index 0000000..d627fcf --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; + +import org.junit.Test; + +import static junit.framework.Assert.*; + +public class HintsCatalogTest +{ + @Test + public void loadCompletenessAndOrderTest() throws IOException + { + File directory = Files.createTempDirectory(null).toFile(); + try + { + loadCompletenessAndOrderTest(directory); + } + finally + { + directory.deleteOnExit(); + } + } + + public static void loadCompletenessAndOrderTest(File directory) throws IOException + { + UUID hostId1 = UUID.randomUUID(); + UUID hostId2 = UUID.randomUUID(); + + long timestamp1 = System.currentTimeMillis(); + long timestamp2 = System.currentTimeMillis() + 1; + long timestamp3 = System.currentTimeMillis() + 2; + long timestamp4 = System.currentTimeMillis() + 3; + + HintsDescriptor descriptor1 = new HintsDescriptor(hostId1, timestamp1); + HintsDescriptor descriptor2 = new HintsDescriptor(hostId2, timestamp3); + HintsDescriptor descriptor3 = new HintsDescriptor(hostId2, timestamp2); + HintsDescriptor descriptor4 = new HintsDescriptor(hostId1, timestamp4); + + writeDescriptor(directory, descriptor1); + writeDescriptor(directory, descriptor2); + writeDescriptor(directory, descriptor3); + writeDescriptor(directory, descriptor4); + + HintsCatalog catalog = HintsCatalog.load(directory); + assertEquals(2, catalog.stores().count()); + + HintsStore store1 = catalog.get(hostId1); + assertNotNull(store1); + assertEquals(descriptor1, store1.poll()); + assertEquals(descriptor4, store1.poll()); + assertNull(store1.poll()); + + HintsStore store2 = catalog.get(hostId2); + assertNotNull(store2); + assertEquals(descriptor3, store2.poll()); + assertEquals(descriptor2, store2.poll()); + assertNull(store2.poll()); + } + + @SuppressWarnings("EmptyTryBlock") + private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException + { + try (HintsWriter ignored = HintsWriter.create(directory, descriptor)) + { + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java new file mode 100644 index 0000000..08487d1 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.DataInput; +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import org.junit.Test; + +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.util.DataOutputBuffer; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotSame; +import static junit.framework.Assert.fail; + +public class HintsDescriptorTest +{ + @Test + public void testSerializerNormal() throws IOException + { + UUID hostId = UUID.randomUUID(); + int version = HintsDescriptor.CURRENT_VERSION; + long timestamp = System.currentTimeMillis(); + ImmutableMap<String, Object> parameters = + ImmutableMap.of("compression", (Object) ImmutableMap.of("class_name", LZ4Compressor.class.getName())); + HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters); + + testSerializeDeserializeLoop(descriptor); + } + + @Test + public void testSerializerWithEmptyParameters() throws IOException + { + UUID hostId = UUID.randomUUID(); + int version = HintsDescriptor.CURRENT_VERSION; + long timestamp = System.currentTimeMillis(); + ImmutableMap<String, Object> parameters = ImmutableMap.of(); + HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters); + + testSerializeDeserializeLoop(descriptor); + } + + @Test + public void testCorruptedDeserialize() throws IOException + { + UUID hostId = UUID.randomUUID(); + int version = HintsDescriptor.CURRENT_VERSION; + long timestamp = System.currentTimeMillis(); + ImmutableMap<String, Object> parameters = ImmutableMap.of(); + HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters); + + byte[] bytes = serializeDescriptor(descriptor); + + // mess up the parameters size + bytes[28] = (byte) 0xFF; + bytes[29] = (byte) 0xFF; + bytes[30] = (byte) 0xFF; + bytes[31] = (byte) 0x7F; + + // attempt to deserialize + try + { + deserializeDescriptor(bytes); + fail("Deserializing the descriptor should but didn't"); + } + catch (IOException e) + { + assertEquals("Hints Descriptor CRC Mismatch", e.getMessage()); + } + } + + @Test + @SuppressWarnings("EmptyTryBlock") + public void testReadFromFile() throws IOException + { + UUID hostId = UUID.randomUUID(); + int version = HintsDescriptor.CURRENT_VERSION; + long timestamp = System.currentTimeMillis(); + ImmutableMap<String, Object> parameters = ImmutableMap.of(); + HintsDescriptor expected = new HintsDescriptor(hostId, version, timestamp, parameters); + + File directory = Files.createTempDir(); + try + { + try (HintsWriter ignored = HintsWriter.create(directory, expected)) + { + } + HintsDescriptor actual = HintsDescriptor.readFromFile(new File(directory, expected.fileName()).toPath()); + assertEquals(expected, actual); + } + finally + { + directory.deleteOnExit(); + } + } + + private static void testSerializeDeserializeLoop(HintsDescriptor descriptor) throws IOException + { + // serialize to a byte array + byte[] bytes = serializeDescriptor(descriptor); + // make sure the sizes match + assertEquals(bytes.length, descriptor.serializedSize()); + // deserialize back + HintsDescriptor deserializedDescriptor = deserializeDescriptor(bytes); + // compare equality + assertDescriptorsEqual(descriptor, deserializedDescriptor); + } + + private static byte[] serializeDescriptor(HintsDescriptor descriptor) throws IOException + { + DataOutputBuffer dob = new DataOutputBuffer(); + descriptor.serialize(dob); + return dob.toByteArray(); + } + + private static HintsDescriptor deserializeDescriptor(byte[] bytes) throws IOException + { + DataInput in = ByteStreams.newDataInput(bytes); + return HintsDescriptor.deserialize(in); + } + + private static void assertDescriptorsEqual(HintsDescriptor expected, HintsDescriptor actual) + { + assertNotSame(expected, actual); + assertEquals(expected, actual); + assertEquals(expected.hashCode(), actual.hashCode()); + assertEquals(expected.hostId, actual.hostId); + assertEquals(expected.version, actual.version); + assertEquals(expected.timestamp, actual.timestamp); + assertEquals(expected.parameters, actual.parameters); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsTestUtil.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsTestUtil.java b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java new file mode 100644 index 0000000..89b532f --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.util.UUID; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.AbstractBTreePartition; +import org.apache.cassandra.db.partitions.PartitionUpdate; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +final class HintsTestUtil +{ + static void assertMutationsEqual(Mutation expected, Mutation actual) + { + assertEquals(expected.key(), actual.key()); + assertEquals(expected.getPartitionUpdates().size(), actual.getPartitionUpdates().size()); + + for (UUID id : expected.getColumnFamilyIds()) + assertPartitionsEqual(expected.getPartitionUpdate(id), actual.getPartitionUpdate(id)); + } + + static void assertPartitionsEqual(AbstractBTreePartition expected, AbstractBTreePartition actual) + { + assertEquals(expected.partitionKey(), actual.partitionKey()); + assertEquals(expected.deletionInfo(), actual.deletionInfo()); + assertEquals(expected.columns(), actual.columns()); + assertTrue(Iterators.elementsEqual(expected.iterator(), actual.iterator())); + } + + static void assertHintsEqual(Hint expected, Hint actual) + { + assertEquals(expected.mutation.getKeyspaceName(), actual.mutation.getKeyspaceName()); + assertEquals(expected.mutation.key(), actual.mutation.key()); + assertEquals(expected.mutation.getColumnFamilyIds(), actual.mutation.getColumnFamilyIds()); + for (PartitionUpdate partitionUpdate : expected.mutation.getPartitionUpdates()) + assertPartitionsEqual(partitionUpdate, actual.mutation.getPartitionUpdate(partitionUpdate.metadata().cfId)); + assertEquals(expected.creationTime, actual.creationTime); + assertEquals(expected.gcgs, actual.gcgs); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java new file mode 100644 index 0000000..85e4b69 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.*; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + +import static org.apache.cassandra.hints.HintsTestUtil.assertMutationsEqual; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +// TODO: test split into several files +@SuppressWarnings("deprecation") +public class LegacyHintsMigratorTest +{ + private static final String KEYSPACE = "legacy_hints_migrator_test"; + private static final String TABLE = "table"; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + } + + @Test + public void testNothingToMigrate() throws IOException + { + File directory = Files.createTempDirectory(null).toFile(); + try + { + testNothingToMigrate(directory); + } + finally + { + directory.deleteOnExit(); + } + } + + private static void testNothingToMigrate(File directory) + { + // truncate system.hints to enseure nothing inside + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking(); + new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate(); + HintsCatalog catalog = HintsCatalog.load(directory); + assertEquals(0, catalog.stores().count()); + } + + @Test + public void testMigrationIsComplete() throws IOException + { + File directory = Files.createTempDirectory(null).toFile(); + try + { + testMigrationIsComplete(directory); + } + finally + { + directory.deleteOnExit(); + } + } + + private static void testMigrationIsComplete(File directory) + { + long timestamp = System.currentTimeMillis(); + + // write 100 mutations for each of the 10 generated endpoints + Map<UUID, Queue<Mutation>> mutations = new HashMap<>(); + for (int i = 0; i < 10; i++) + { + UUID hostId = UUID.randomUUID(); + Queue<Mutation> queue = new LinkedList<>(); + mutations.put(hostId, queue); + + for (int j = 0; j < 100; j++) + { + Mutation mutation = createMutation(j, timestamp + j); + queue.offer(mutation); + Mutation legacyHint = createLegacyHint(mutation, timestamp, hostId); + legacyHint.applyUnsafe(); + } + } + + // run the migration + new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate(); + + // validate that the hints table is truncated now + assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty()); + + HintsCatalog catalog = HintsCatalog.load(directory); + + // assert that we've correctly loaded 10 hints stores + assertEquals(10, catalog.stores().count()); + + // for each of the 10 stores, make sure the mutations have been migrated correctly + for (Map.Entry<UUID, Queue<Mutation>> entry : mutations.entrySet()) + { + HintsStore store = catalog.get(entry.getKey()); + assertNotNull(store); + + HintsDescriptor descriptor = store.poll(); + assertNotNull(descriptor); + + // read all the hints + Queue<Hint> actualHints = new LinkedList<>(); + try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName()))) + { + for (HintsReader.Page page : reader) + page.hintsIterator().forEachRemaining(actualHints::offer); + } + + // assert the size matches + assertEquals(100, actualHints.size()); + + // compare expected hints to actual hints + for (int i = 0; i < 100; i++) + { + Hint hint = actualHints.poll(); + Mutation mutation = entry.getValue().poll(); + int ttl = mutation.smallestGCGS(); + + assertEquals(timestamp, hint.creationTime); + assertEquals(ttl, hint.gcgs); + assertMutationsEqual(mutation, hint.mutation); + } + } + } + + // legacy hint mutation creation code, copied more or less verbatim from the previous implementation + private static Mutation createLegacyHint(Mutation mutation, long now, UUID targetId) + { + int version = MessagingService.VERSION_21; + int ttl = mutation.smallestGCGS(); + UUID hintId = UUIDGen.getTimeUUID(); + + ByteBuffer key = UUIDType.instance.decompose(targetId); + Clustering clustering = SystemKeyspace.LegacyHints.comparator.make(hintId, version); + ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, version)); + Cell cell = BufferCell.expiring(SystemKeyspace.LegacyHints.compactValueColumn(), + now, + ttl, + FBUtilities.nowInSeconds(), + value); + return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.LegacyHints, + key, + BTreeRow.singleCellRow(clustering, cell))); + } + + private static Mutation createMutation(int index, long timestamp) + { + CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + return new RowUpdateBuilder(table, timestamp, bytes(index)) + .clustering(bytes(index)) + .add("val", bytes(index)) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java new file mode 100644 index 0000000..6f76db4 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.metrics; + +import java.net.InetAddress; +import java.util.Map; +import java.util.UUID; + +import org.junit.Test; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.hints.HintsService; + +import static org.junit.Assert.assertEquals; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; + +public class HintedHandOffMetricsTest +{ + @Test + public void testHintsMetrics() throws Exception + { + DatabaseDescriptor.getHintsDirectory().mkdirs(); + + for (int i = 0; i < 99; i++) + HintsService.instance.metrics.incrPastWindow(InetAddress.getLocalHost()); + HintsService.instance.metrics.log(); + + UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS); + Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance); + assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/service/StorageProxyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java index 801fc53..42eb1f5 100644 --- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java +++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java @@ -25,9 +25,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.*; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.rp; import static org.apache.cassandra.Util.token; @@ -78,6 +78,7 @@ public class StorageProxyTest @BeforeClass public static void beforeClass() throws Throwable { + DatabaseDescriptor.getHintsDirectory().mkdir(); TokenMetadata tmd = StorageService.instance.getTokenMetadata(); tmd.updateNormalToken(token("1"), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(token("6"), InetAddress.getByName("127.0.0.6"));
