Fix Digest mismatch Exception if hints file has UnknownColumnFamily Patch by Jay Zhuang; Reviewed by Jeff Jirsa for CASSANDRA-13696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f919cf4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f919cf4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f919cf4a Branch: refs/heads/cassandra-3.11 Commit: f919cf4a478cdbcb7864e8b47814a40bfcb343a7 Parents: 19adec1 Author: Jeff Jirsa <jji...@apple.com> Authored: Wed Jul 26 16:56:11 2017 -0700 Committer: Jeff Jirsa <jji...@apple.com> Committed: Wed Jul 26 16:56:11 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hints/ChecksummedDataInput.java | 2 +- .../apache/cassandra/hints/HintsDescriptor.java | 2 +- .../org/apache/cassandra/hints/HintsReader.java | 2 +- .../apache/cassandra/hints/HintsReaderTest.java | 172 +++++++++++++++++++ 5 files changed, 176 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3247277..5e6d189 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 3.0.15 * Fixed ambiguous output of nodetool tablestats command (CASSANDRA-13722) * JMXEnabledThreadPoolExecutor with corePoolSize equal to maxPoolSize (Backport CASSANDRA-13329) + * Fix Digest mismatch Exception if hints file has UnknownColumnFamily (CASSANDRA-13696) * Purge tombstones created by expired cells (CASSANDRA-13643) * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482) * Set test.runners based on cores and memory size (CASSANDRA-13078) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 095d7f4..39f46a4 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -107,7 +107,7 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW { updateCrc(); - // we must diable crc updates in case we rebuffer + // we must disable crc updates in case we rebuffer // when called source.readInt() crcUpdateDisabled = true; return ((int) crc.getValue()) == readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/HintsDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index f5296b3..916da4e 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -120,7 +120,7 @@ final class HintsDescriptor switch (hintsVersion) { case VERSION_30: - return MessagingService.VERSION_30; + return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014; default: throw new AssertionError(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index 8104051..973a15e 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -239,7 +239,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> descriptor.fileName()); input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit())); - return null; + hint = null; // set the return value to null and let following code to update/check the CRC } if (input.checkCrc()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java new file mode 100644 index 0000000..70cf6e7 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterables; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.KeyspaceParams; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static org.apache.cassandra.Util.dk; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class HintsReaderTest +{ + private static final String CF_STANDARD1 = "Standard1"; + private static final String CF_STANDARD2 = "Standard2"; + + private static HintsDescriptor descriptor; + + private static File directory; + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + + descriptor = new HintsDescriptor(UUID.randomUUID(), System.currentTimeMillis()); + } + + private static Mutation createMutation(int index, long timestamp, String ks, String tb) + { + CFMetaData table = Schema.instance.getCFMetaData(ks, tb); + return new RowUpdateBuilder(table, timestamp, bytes(index)) + .clustering(bytes(index)) + .add("val", bytes(index)) + .build(); + } + + private void generateHints(int num, String ks) throws IOException + { + try (HintsWriter writer = HintsWriter.create(directory, descriptor)) + { + ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024); + try (HintsWriter.Session session = writer.newSession(buffer)) + { + for (int i = 0; i < num; i++) + { + long timestamp = descriptor.timestamp + i; + Mutation m = createMutation(i, TimeUnit.MILLISECONDS.toMicros(timestamp), ks, CF_STANDARD1); + session.append(Hint.create(m, timestamp)); + m = createMutation(i, TimeUnit.MILLISECONDS.toMicros(timestamp), ks, CF_STANDARD2); + session.append(Hint.create(m, timestamp)); + } + } + FileUtils.clean(buffer); + } + } + + private void readHints(int num, int numTable) throws IOException + { + long baseTimestamp = descriptor.timestamp; + int index = 0; + + try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName()))) + { + for (HintsReader.Page page : reader) + { + Iterator<Hint> hints = page.hintsIterator(); + while (hints.hasNext()) + { + int i = index / numTable; + Hint hint = hints.next(); + + long timestamp = baseTimestamp + i; + Mutation mutation = hint.mutation; + + assertEquals(timestamp, hint.creationTime); + assertEquals(dk(bytes(i)), mutation.key()); + + Row row = mutation.getPartitionUpdates().iterator().next().iterator().next(); + assertEquals(1, Iterables.size(row.cells())); + assertEquals(bytes(i), row.clustering().get(0)); + Cell cell = row.cells().iterator().next(); + assertNotNull(cell); + assertEquals(bytes(i), cell.value()); + assertEquals(timestamp * 1000, cell.timestamp()); + + index++; + } + } + } + + assertEquals(index, num); + } + + @Test + public void testNormalRead() throws IOException + { + String ks = "testNormalRead"; + SchemaLoader.createKeyspace(ks, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(ks, CF_STANDARD1), + SchemaLoader.standardCFMD(ks, CF_STANDARD2)); + int numTable = 2; + directory = Files.createTempDirectory(null).toFile(); + try + { + generateHints(3, ks); + readHints(3 * numTable, numTable); + } + finally + { + directory.delete(); + } + } + + @Test + public void testDroppedTableRead() throws IOException + { + String ks = "testDroppedTableRead"; + SchemaLoader.createKeyspace(ks, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(ks, CF_STANDARD1), + SchemaLoader.standardCFMD(ks, CF_STANDARD2)); + directory = Files.createTempDirectory(null).toFile(); + try + { + generateHints(3, ks); + Schema.instance.dropTable(ks, CF_STANDARD1); + readHints(3, 1); + } + finally + { + directory.delete(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org