Fix Digest mismatch Exception if hints file has UnknownColumnFamily

Patch by Jay Zhuang; Reviewed by Jeff Jirsa for CASSANDRA-13696


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f919cf4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f919cf4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f919cf4a

Branch: refs/heads/cassandra-3.11
Commit: f919cf4a478cdbcb7864e8b47814a40bfcb343a7
Parents: 19adec1
Author: Jeff Jirsa <jji...@apple.com>
Authored: Wed Jul 26 16:56:11 2017 -0700
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Wed Jul 26 16:56:11 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hints/ChecksummedDataInput.java   |   2 +-
 .../apache/cassandra/hints/HintsDescriptor.java |   2 +-
 .../org/apache/cassandra/hints/HintsReader.java |   2 +-
 .../apache/cassandra/hints/HintsReaderTest.java | 172 +++++++++++++++++++
 5 files changed, 176 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3247277..5e6d189 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 3.0.15
  * Fixed ambiguous output of nodetool tablestats command (CASSANDRA-13722)
  * JMXEnabledThreadPoolExecutor with corePoolSize equal to maxPoolSize 
(Backport CASSANDRA-13329)
+ * Fix Digest mismatch Exception if hints file has UnknownColumnFamily 
(CASSANDRA-13696)
  * Purge tombstones created by expired cells (CASSANDRA-13643)
  * Make concat work with iterators that have different subsets of columns 
(CASSANDRA-13482)
  * Set test.runners based on cores and memory size (CASSANDRA-13078)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 095d7f4..39f46a4 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -107,7 +107,7 @@ public class ChecksummedDataInput extends 
RandomAccessReader.RandomAccessReaderW
         {
             updateCrc();
 
-            // we must diable crc updates in case we rebuffer
+            // we must disable crc updates in case we rebuffer
             // when called source.readInt()
             crcUpdateDisabled = true;
             return ((int) crc.getValue()) == readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java 
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index f5296b3..916da4e 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -120,7 +120,7 @@ final class HintsDescriptor
         switch (hintsVersion)
         {
             case VERSION_30:
-                return MessagingService.VERSION_30;
+                return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? 
MessagingService.VERSION_30 : MessagingService.VERSION_3014;
             default:
                 throw new AssertionError();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java 
b/src/java/org/apache/cassandra/hints/HintsReader.java
index 8104051..973a15e 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -239,7 +239,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
                             descriptor.fileName());
                 input.skipBytes(Ints.checkedCast(size - 
input.bytesPastLimit()));
 
-                return null;
+                hint = null; // set the return value to null and let following 
code to update/check the CRC
             }
 
             if (input.checkCrc())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f919cf4a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java 
b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
new file mode 100644
index 0000000..70cf6e7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsReaderTest
+{
+    private static final String CF_STANDARD1 = "Standard1";
+    private static final String CF_STANDARD2 = "Standard2";
+
+    private static HintsDescriptor descriptor;
+
+    private static File directory;
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+
+        descriptor = new HintsDescriptor(UUID.randomUUID(), 
System.currentTimeMillis());
+    }
+
+    private static Mutation createMutation(int index, long timestamp, String 
ks, String tb)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(ks, tb);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+
+    private void generateHints(int num, String ks) throws IOException
+    {
+        try (HintsWriter writer = HintsWriter.create(directory, descriptor))
+        {
+            ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
+            try (HintsWriter.Session session = writer.newSession(buffer))
+            {
+                for (int i = 0; i < num; i++)
+                {
+                    long timestamp = descriptor.timestamp + i;
+                    Mutation m = createMutation(i, 
TimeUnit.MILLISECONDS.toMicros(timestamp), ks, CF_STANDARD1);
+                    session.append(Hint.create(m, timestamp));
+                    m = createMutation(i, 
TimeUnit.MILLISECONDS.toMicros(timestamp), ks, CF_STANDARD2);
+                    session.append(Hint.create(m, timestamp));
+                }
+            }
+            FileUtils.clean(buffer);
+        }
+    }
+
+    private void readHints(int num, int numTable) throws IOException
+    {
+        long baseTimestamp = descriptor.timestamp;
+        int index = 0;
+
+        try (HintsReader reader = HintsReader.open(new File(directory, 
descriptor.fileName())))
+        {
+            for (HintsReader.Page page : reader)
+            {
+                Iterator<Hint> hints = page.hintsIterator();
+                while (hints.hasNext())
+                {
+                    int i = index / numTable;
+                    Hint hint = hints.next();
+
+                    long timestamp = baseTimestamp + i;
+                    Mutation mutation = hint.mutation;
+
+                    assertEquals(timestamp, hint.creationTime);
+                    assertEquals(dk(bytes(i)), mutation.key());
+
+                    Row row = 
mutation.getPartitionUpdates().iterator().next().iterator().next();
+                    assertEquals(1, Iterables.size(row.cells()));
+                    assertEquals(bytes(i), row.clustering().get(0));
+                    Cell cell = row.cells().iterator().next();
+                    assertNotNull(cell);
+                    assertEquals(bytes(i), cell.value());
+                    assertEquals(timestamp * 1000, cell.timestamp());
+
+                    index++;
+                }
+            }
+        }
+
+        assertEquals(index, num);
+    }
+
+    @Test
+    public void testNormalRead() throws IOException
+    {
+        String ks = "testNormalRead";
+        SchemaLoader.createKeyspace(ks,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(ks, 
CF_STANDARD1),
+                                    SchemaLoader.standardCFMD(ks, 
CF_STANDARD2));
+        int numTable = 2;
+        directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            generateHints(3, ks);
+            readHints(3 * numTable, numTable);
+        }
+        finally
+        {
+            directory.delete();
+        }
+    }
+
+    @Test
+    public void testDroppedTableRead() throws IOException
+    {
+        String ks = "testDroppedTableRead";
+        SchemaLoader.createKeyspace(ks,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(ks, 
CF_STANDARD1),
+                                    SchemaLoader.standardCFMD(ks, 
CF_STANDARD2));
+        directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            generateHints(3, ks);
+            Schema.instance.dropTable(ks, CF_STANDARD1);
+            readHints(3, 1);
+        }
+        finally
+        {
+            directory.delete();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to