Repository: cassandra
Updated Branches:
  refs/heads/trunk 01ade6f25 -> f8159ac50


Bring back maxHintTTL propery

Patch by Blake Eggleston, reviewed by Aleksey Yeschenko for CASSANDRA-12982


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8159ac5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8159ac5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8159ac5

Branch: refs/heads/trunk
Commit: f8159ac5071d53e8bc9c28e3a1eaf3ba798ff287
Parents: 01ade6f
Author: Blake Eggleston <[email protected]>
Authored: Tue Nov 8 14:11:47 2016 -0800
Committer: Blake Eggleston <[email protected]>
Committed: Tue May 16 14:37:33 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/jvm.options                                |   3 +
 src/java/org/apache/cassandra/hints/Hint.java   |  82 ++++++++-
 .../org/apache/cassandra/hints/HintsReader.java |   7 +-
 .../org/apache/cassandra/hints/HintsWriter.java |   6 +
 .../io/util/RebufferingInputStream.java         |   2 +-
 .../apache/cassandra/utils/vint/VIntCoding.java |   1 +
 .../cassandra/hints/HintWriteTTLTest.java       | 169 +++++++++++++++++++
 8 files changed, 263 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03870dd..9d1323d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Bring back maxHintTTL propery (CASSANDRA-12982)
  * Add testing guidelines (CASSANDRA-13497)
  * Add more repair metrics (CASSANDRA-13531)
  * RangeStreamer should be smarter when picking endpoints for streaming 
(CASSANDRA-4650)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/conf/jvm.options
----------------------------------------------------------------------
diff --git a/conf/jvm.options b/conf/jvm.options
index 49b2196..398b52f 100644
--- a/conf/jvm.options
+++ b/conf/jvm.options
@@ -79,6 +79,9 @@
 # 10000 rows per page.
 #-Dcassandra.force_default_indexing_page_size=true
 
+# Imposes an upper bound on hint lifetime below the normal min gc_grace_seconds
+#-Dcassandra.maxHintTTL=max_hint_ttl_in_seconds
+
 ########################
 # GENERAL JVM SETTINGS #
 ########################

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java 
b/src/java/org/apache/cassandra/hints/Hint.java
index 1582a3c..b0abd50 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -24,11 +24,17 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Throwables;
 
+import javax.annotation.Nullable;
+
+import com.google.common.primitives.Ints;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.vint.VIntCoding;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
@@ -51,10 +57,11 @@ import static 
org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 public final class Hint
 {
     public static final Serializer serializer = new Serializer();
+    static final int maxHintTTL = Integer.getInteger("cassandra.maxHintTTL", 
Integer.MAX_VALUE);
 
     final Mutation mutation;
     final long creationTime;  // time of hint creation (in milliseconds)
-    final int gcgs; // the smallest gc gs of all involved tables
+    final int gcgs; // the smallest gc gs of all involved tables (in seconds)
 
     private Hint(Mutation mutation, long creationTime, int gcgs)
     {
@@ -115,13 +122,25 @@ public final class Hint
     }
 
     /**
+     * @return the overall ttl of the hint - the minimum of all mutation's 
tables' gc gs now and at the time of creation
+     */
+    int ttl()
+    {
+        return Math.min(gcgs, mutation.smallestGCGS());
+    }
+
+    /**
      * @return calculates whether or not it is safe to apply the hint without 
risking to resurrect any deleted data
      */
     boolean isLive()
     {
-        int smallestGCGS = Math.min(gcgs, mutation.smallestGCGS());
-        long expirationTime = creationTime + 
TimeUnit.SECONDS.toMillis(smallestGCGS);
-        return expirationTime > System.currentTimeMillis();
+        return isLive(creationTime, System.currentTimeMillis(), ttl());
+    }
+
+    static boolean isLive(long creationTime, long now, int hintTTL)
+    {
+        long expirationTime = creationTime + 
TimeUnit.SECONDS.toMillis(Math.min(hintTTL, maxHintTTL));
+        return expirationTime > now;
     }
 
     static final class Serializer implements IVersionedSerializer<Hint>
@@ -152,5 +171,60 @@ public final class Hint
         {
             return hintBuffer.getLong(0);
         }
+
+        /**
+         * Will short-circuit Mutation deserialization if the hint is 
definitely dead. If a Hint instance is
+         * returned, there is a chance it's live, if gcgs on one of the table 
involved got reduced between
+         * hint creation and deserialization, but this does not impact 
correctness - an extra liveness check will
+         * also be performed on the receiving end.
+         *
+         * @return null if the hint is definitely dead, a Hint instance if 
it's likely live
+         */
+        @Nullable
+        Hint deserializeIfLive(DataInputPlus in, long now, long size, int 
version) throws IOException
+        {
+            long creationTime = in.readLong();
+            int gcgs = (int) in.readUnsignedVInt();
+            int bytesRead = sizeof(creationTime) + sizeofUnsignedVInt(gcgs);
+
+            if (isLive(creationTime, now, gcgs))
+                return new Hint(Mutation.serializer.deserialize(in, version), 
creationTime, gcgs);
+
+            in.skipBytesFully(Ints.checkedCast(size) - bytesRead);
+            return null;
+        }
+
+        /**
+         * Will short-circuit ByteBuffer allocation if the hint is definitely 
dead. If a ByteBuffer instance is
+         * returned, there is a chance it's live, if gcgs on one of the table 
involved got reduced between
+         * hint creation and deserialization, but this does not impact 
correctness - an extra liveness check will
+         * also be performed on the receiving end.
+         *
+         * @return null if the hint is definitely dead, a ByteBuffer instance 
if it's likely live
+         */
+        @Nullable
+        ByteBuffer readBufferIfLive(DataInputPlus in, long now, int size, int 
version) throws IOException
+        {
+            int maxHeaderSize = Math.min(sizeof(Long.MAX_VALUE) + 
VIntCoding.MAX_SIZE, size);
+            byte[] header = new byte[maxHeaderSize];
+            in.readFully(header);
+
+            try (DataInputBuffer input = new DataInputBuffer(header))
+            {
+                long creationTime = input.readLong();
+                int gcgs = (int) input.readUnsignedVInt();
+
+                if (!isLive(creationTime, now, gcgs))
+                {
+                    in.skipBytesFully(size - maxHeaderSize);
+                    return null;
+                }
+            }
+
+            byte[] bytes = new byte[size];
+            System.arraycopy(header, 0, bytes, 0, header.length);
+            in.readFully(bytes, header.length, size - header.length);
+            return ByteBuffer.wrap(bytes);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java 
b/src/java/org/apache/cassandra/hints/HintsReader.java
index dbcd7f3..9bf55bf 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * A paged non-compressed hints reader that provides two iterators:
@@ -165,6 +164,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
     final class HintsIterator extends AbstractIterator<Hint>
     {
         private final InputPosition offset;
+        private final long now = System.currentTimeMillis();
 
         HintsIterator(InputPosition offset)
         {
@@ -228,7 +228,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
             Hint hint;
             try
             {
-                hint = Hint.serializer.deserialize(input, 
descriptor.messagingVersion());
+                hint = Hint.serializer.deserializeIfLive(input, now, size, 
descriptor.messagingVersion());
                 input.checkLimit(0);
             }
             catch (UnknownTableException e)
@@ -262,6 +262,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
     final class BuffersIterator extends AbstractIterator<ByteBuffer>
     {
         private final InputPosition offset;
+        private final long now = System.currentTimeMillis();
 
         BuffersIterator(InputPosition offset)
         {
@@ -322,7 +323,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
                 rateLimiter.acquire(size);
             input.limit(size);
 
-            ByteBuffer buffer = ByteBufferUtil.read(input, size);
+            ByteBuffer buffer = Hint.serializer.readBufferIfLive(input, now, 
size, descriptor.messagingVersion());
             if (input.checkCrc())
                 return buffer;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java 
b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 48b8c7c..5997eb4 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -143,6 +143,12 @@ class HintsWriter implements AutoCloseable
         }
     }
 
+    @VisibleForTesting
+    File getFile()
+    {
+        return file;
+    }
+
     /**
      * Writes byte buffer into the file channel. Buffer should be flipped 
before calling this
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java 
b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 094115a..086f5c9 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -112,7 +112,7 @@ public abstract class RebufferingInputStream extends 
InputStream implements Data
     @Override
     public int skipBytes(int n) throws IOException
     {
-        if (n < 0)
+        if (n <= 0)
             return 0;
         int requested = n;
         int position = buffer.position(), limit = buffer.limit(), remaining;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java 
b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index b490b97..a8a1654 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -59,6 +59,7 @@ import net.nicoulaj.compilecommand.annotations.Inline;
  */
 public class VIntCoding
 {
+    public static final int MAX_SIZE = 10;
 
     public static long readUnsignedVInt(DataInput input) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java 
b/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java
new file mode 100644
index 0000000..b06187d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class HintWriteTTLTest
+{
+    private static int TTL = 500;
+    private static int GC_GRACE = 84600;
+
+    private static Hint makeHint(TableMetadata tbm, int key, int creationTime, 
int gcgs)
+    {
+        PartitionUpdate update = PartitionUpdate.fullPartitionDelete(tbm,
+                                                                     
ByteBufferUtil.bytes(key),
+                                                                     
s2m(creationTime),
+                                                                     
creationTime);
+        Mutation mutation = new Mutation(update);
+        return Hint.create(mutation, s2m(creationTime), gcgs);
+    }
+
+    private static DecoratedKey hintKey(Hint hint)
+    {
+        return hint.mutation.key();
+    }
+
+    private static Hint deserialize(ByteBuffer bb) throws IOException
+    {
+        DataInputBuffer input = new DataInputBuffer(bb, true);
+        try
+        {
+            return Hint.serializer.deserialize(input, 
MessagingService.current_version);
+        }
+        finally
+        {
+            input.close();
+        }
+    }
+
+    private static Hint ttldHint = null;
+    private static Hint liveHint = null;
+    private static File hintFile = null;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        System.setProperty("cassandra.maxHintTTL", Integer.toString(TTL));
+        SchemaLoader.prepareServer();
+        TableMetadata tbm = CreateTableStatement.parse("CREATE TABLE tbl (k 
INT PRIMARY KEY, v INT)", "ks").gcGraceSeconds(GC_GRACE).build();
+        SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), tbm);
+
+        int nowInSeconds = FBUtilities.nowInSeconds();
+        liveHint = makeHint(tbm, 1, nowInSeconds, GC_GRACE);
+        ttldHint = makeHint(tbm, 2, nowInSeconds - (TTL + 1), GC_GRACE);
+
+
+        File directory = Files.createTempDirectory(null).toFile();
+        HintsDescriptor descriptor = new 
HintsDescriptor(UUIDGen.getTimeUUID(), s2m(nowInSeconds));
+
+        try (HintsWriter writer = HintsWriter.create(directory, descriptor);
+             HintsWriter.Session session = 
writer.newSession(ByteBuffer.allocate(1024)))
+        {
+            session.append(liveHint);
+            session.append(ttldHint);
+            hintFile = writer.getFile();
+        }
+    }
+
+    private static long s2m(int seconds)
+    {
+        return TimeUnit.SECONDS.toMillis(seconds);
+    }
+
+    @Test
+    public void isLive() throws Exception
+    {
+        // max ttl is set to 500
+        Assert.assertTrue(Hint.isLive(s2m(0), s2m(499), 500));  // still live
+        Assert.assertFalse(Hint.isLive(s2m(0), s2m(499), 499)); // expired due 
to hint's own ttl
+        Assert.assertFalse(Hint.isLive(s2m(0), s2m(500), 501)); // expired due 
to max ttl
+    }
+
+    @Test
+    public void hintIsLive() throws Exception
+    {
+        Assert.assertTrue(liveHint.isLive());
+        Assert.assertFalse(ttldHint.isLive());
+    }
+
+    @Test
+    public void hintIterator() throws Exception
+    {
+        List<Hint> hints = new ArrayList<>();
+        try (HintsReader reader = HintsReader.open(hintFile))
+        {
+            for (HintsReader.Page page: reader)
+            {
+                Iterator<Hint> iter = page.hintsIterator();
+                while (iter.hasNext())
+                {
+                    hints.add(iter.next());
+                }
+            }
+        }
+
+        Assert.assertEquals(1, hints.size());
+        Assert.assertEquals(hintKey(liveHint), hintKey(hints.get(0)));
+    }
+
+    @Test
+    public void bufferIterator() throws Exception
+    {
+        List<Hint> hints = new ArrayList<>();
+        try (HintsReader reader = HintsReader.open(hintFile))
+        {
+            for (HintsReader.Page page: reader)
+            {
+                Iterator<ByteBuffer> iter = page.buffersIterator();
+                while (iter.hasNext())
+                {
+                    hints.add(deserialize(iter.next()));
+                }
+            }
+        }
+
+        Assert.assertEquals(1, hints.size());
+        Assert.assertEquals(hintKey(liveHint), hintKey(hints.get(0)));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to