Repository: cassandra Updated Branches: refs/heads/trunk 01ade6f25 -> f8159ac50
Bring back maxHintTTL propery Patch by Blake Eggleston, reviewed by Aleksey Yeschenko for CASSANDRA-12982 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8159ac5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8159ac5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8159ac5 Branch: refs/heads/trunk Commit: f8159ac5071d53e8bc9c28e3a1eaf3ba798ff287 Parents: 01ade6f Author: Blake Eggleston <[email protected]> Authored: Tue Nov 8 14:11:47 2016 -0800 Committer: Blake Eggleston <[email protected]> Committed: Tue May 16 14:37:33 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/jvm.options | 3 + src/java/org/apache/cassandra/hints/Hint.java | 82 ++++++++- .../org/apache/cassandra/hints/HintsReader.java | 7 +- .../org/apache/cassandra/hints/HintsWriter.java | 6 + .../io/util/RebufferingInputStream.java | 2 +- .../apache/cassandra/utils/vint/VIntCoding.java | 1 + .../cassandra/hints/HintWriteTTLTest.java | 169 +++++++++++++++++++ 8 files changed, 263 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 03870dd..9d1323d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Bring back maxHintTTL propery (CASSANDRA-12982) * Add testing guidelines (CASSANDRA-13497) * Add more repair metrics (CASSANDRA-13531) * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/conf/jvm.options ---------------------------------------------------------------------- diff --git a/conf/jvm.options b/conf/jvm.options index 49b2196..398b52f 100644 --- a/conf/jvm.options +++ b/conf/jvm.options @@ -79,6 +79,9 @@ # 10000 rows per page. #-Dcassandra.force_default_indexing_page_size=true +# Imposes an upper bound on hint lifetime below the normal min gc_grace_seconds +#-Dcassandra.maxHintTTL=max_hint_ttl_in_seconds + ######################## # GENERAL JVM SETTINGS # ######################## http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/Hint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java index 1582a3c..b0abd50 100644 --- a/src/java/org/apache/cassandra/hints/Hint.java +++ b/src/java/org/apache/cassandra/hints/Hint.java @@ -24,11 +24,17 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Throwables; +import javax.annotation.Nullable; + +import com.google.common.primitives.Ints; + import org.apache.cassandra.db.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.vint.VIntCoding; import static org.apache.cassandra.db.TypeSizes.sizeof; import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt; @@ -51,10 +57,11 @@ import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt; public final class Hint { public static final Serializer serializer = new Serializer(); + static final int maxHintTTL = Integer.getInteger("cassandra.maxHintTTL", Integer.MAX_VALUE); final Mutation mutation; final long creationTime; // time of hint creation (in milliseconds) - final int gcgs; // the smallest gc gs of all involved tables + final int gcgs; // the smallest gc gs of all involved tables (in seconds) private Hint(Mutation mutation, long creationTime, int gcgs) { @@ -115,13 +122,25 @@ public final class Hint } /** + * @return the overall ttl of the hint - the minimum of all mutation's tables' gc gs now and at the time of creation + */ + int ttl() + { + return Math.min(gcgs, mutation.smallestGCGS()); + } + + /** * @return calculates whether or not it is safe to apply the hint without risking to resurrect any deleted data */ boolean isLive() { - int smallestGCGS = Math.min(gcgs, mutation.smallestGCGS()); - long expirationTime = creationTime + TimeUnit.SECONDS.toMillis(smallestGCGS); - return expirationTime > System.currentTimeMillis(); + return isLive(creationTime, System.currentTimeMillis(), ttl()); + } + + static boolean isLive(long creationTime, long now, int hintTTL) + { + long expirationTime = creationTime + TimeUnit.SECONDS.toMillis(Math.min(hintTTL, maxHintTTL)); + return expirationTime > now; } static final class Serializer implements IVersionedSerializer<Hint> @@ -152,5 +171,60 @@ public final class Hint { return hintBuffer.getLong(0); } + + /** + * Will short-circuit Mutation deserialization if the hint is definitely dead. If a Hint instance is + * returned, there is a chance it's live, if gcgs on one of the table involved got reduced between + * hint creation and deserialization, but this does not impact correctness - an extra liveness check will + * also be performed on the receiving end. + * + * @return null if the hint is definitely dead, a Hint instance if it's likely live + */ + @Nullable + Hint deserializeIfLive(DataInputPlus in, long now, long size, int version) throws IOException + { + long creationTime = in.readLong(); + int gcgs = (int) in.readUnsignedVInt(); + int bytesRead = sizeof(creationTime) + sizeofUnsignedVInt(gcgs); + + if (isLive(creationTime, now, gcgs)) + return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs); + + in.skipBytesFully(Ints.checkedCast(size) - bytesRead); + return null; + } + + /** + * Will short-circuit ByteBuffer allocation if the hint is definitely dead. If a ByteBuffer instance is + * returned, there is a chance it's live, if gcgs on one of the table involved got reduced between + * hint creation and deserialization, but this does not impact correctness - an extra liveness check will + * also be performed on the receiving end. + * + * @return null if the hint is definitely dead, a ByteBuffer instance if it's likely live + */ + @Nullable + ByteBuffer readBufferIfLive(DataInputPlus in, long now, int size, int version) throws IOException + { + int maxHeaderSize = Math.min(sizeof(Long.MAX_VALUE) + VIntCoding.MAX_SIZE, size); + byte[] header = new byte[maxHeaderSize]; + in.readFully(header); + + try (DataInputBuffer input = new DataInputBuffer(header)) + { + long creationTime = input.readLong(); + int gcgs = (int) input.readUnsignedVInt(); + + if (!isLive(creationTime, now, gcgs)) + { + in.skipBytesFully(size - maxHeaderSize); + return null; + } + } + + byte[] bytes = new byte[size]; + System.arraycopy(header, 0, bytes, 0, header.length); + in.readFully(bytes, header.length, size - header.length); + return ByteBuffer.wrap(bytes); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index dbcd7f3..9bf55bf 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -35,7 +35,6 @@ import org.apache.cassandra.exceptions.UnknownTableException; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.ByteBufferUtil; /** * A paged non-compressed hints reader that provides two iterators: @@ -165,6 +164,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> final class HintsIterator extends AbstractIterator<Hint> { private final InputPosition offset; + private final long now = System.currentTimeMillis(); HintsIterator(InputPosition offset) { @@ -228,7 +228,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> Hint hint; try { - hint = Hint.serializer.deserialize(input, descriptor.messagingVersion()); + hint = Hint.serializer.deserializeIfLive(input, now, size, descriptor.messagingVersion()); input.checkLimit(0); } catch (UnknownTableException e) @@ -262,6 +262,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> final class BuffersIterator extends AbstractIterator<ByteBuffer> { private final InputPosition offset; + private final long now = System.currentTimeMillis(); BuffersIterator(InputPosition offset) { @@ -322,7 +323,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> rateLimiter.acquire(size); input.limit(size); - ByteBuffer buffer = ByteBufferUtil.read(input, size); + ByteBuffer buffer = Hint.serializer.readBufferIfLive(input, now, size, descriptor.messagingVersion()); if (input.checkCrc()) return buffer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/hints/HintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index 48b8c7c..5997eb4 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -143,6 +143,12 @@ class HintsWriter implements AutoCloseable } } + @VisibleForTesting + File getFile() + { + return file; + } + /** * Writes byte buffer into the file channel. Buffer should be flipped before calling this */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java index 094115a..086f5c9 100644 --- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java +++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java @@ -112,7 +112,7 @@ public abstract class RebufferingInputStream extends InputStream implements Data @Override public int skipBytes(int n) throws IOException { - if (n < 0) + if (n <= 0) return 0; int requested = n; int position = buffer.position(), limit = buffer.limit(), remaining; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/src/java/org/apache/cassandra/utils/vint/VIntCoding.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java index b490b97..a8a1654 100644 --- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java +++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java @@ -59,6 +59,7 @@ import net.nicoulaj.compilecommand.annotations.Inline; */ public class VIntCoding { + public static final int MAX_SIZE = 10; public static long readUnsignedVInt(DataInput input) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8159ac5/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java b/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java new file mode 100644 index 0000000..b06187d --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintWriteTTLTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +public class HintWriteTTLTest +{ + private static int TTL = 500; + private static int GC_GRACE = 84600; + + private static Hint makeHint(TableMetadata tbm, int key, int creationTime, int gcgs) + { + PartitionUpdate update = PartitionUpdate.fullPartitionDelete(tbm, + ByteBufferUtil.bytes(key), + s2m(creationTime), + creationTime); + Mutation mutation = new Mutation(update); + return Hint.create(mutation, s2m(creationTime), gcgs); + } + + private static DecoratedKey hintKey(Hint hint) + { + return hint.mutation.key(); + } + + private static Hint deserialize(ByteBuffer bb) throws IOException + { + DataInputBuffer input = new DataInputBuffer(bb, true); + try + { + return Hint.serializer.deserialize(input, MessagingService.current_version); + } + finally + { + input.close(); + } + } + + private static Hint ttldHint = null; + private static Hint liveHint = null; + private static File hintFile = null; + + @BeforeClass + public static void setupClass() throws Exception + { + System.setProperty("cassandra.maxHintTTL", Integer.toString(TTL)); + SchemaLoader.prepareServer(); + TableMetadata tbm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "ks").gcGraceSeconds(GC_GRACE).build(); + SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), tbm); + + int nowInSeconds = FBUtilities.nowInSeconds(); + liveHint = makeHint(tbm, 1, nowInSeconds, GC_GRACE); + ttldHint = makeHint(tbm, 2, nowInSeconds - (TTL + 1), GC_GRACE); + + + File directory = Files.createTempDirectory(null).toFile(); + HintsDescriptor descriptor = new HintsDescriptor(UUIDGen.getTimeUUID(), s2m(nowInSeconds)); + + try (HintsWriter writer = HintsWriter.create(directory, descriptor); + HintsWriter.Session session = writer.newSession(ByteBuffer.allocate(1024))) + { + session.append(liveHint); + session.append(ttldHint); + hintFile = writer.getFile(); + } + } + + private static long s2m(int seconds) + { + return TimeUnit.SECONDS.toMillis(seconds); + } + + @Test + public void isLive() throws Exception + { + // max ttl is set to 500 + Assert.assertTrue(Hint.isLive(s2m(0), s2m(499), 500)); // still live + Assert.assertFalse(Hint.isLive(s2m(0), s2m(499), 499)); // expired due to hint's own ttl + Assert.assertFalse(Hint.isLive(s2m(0), s2m(500), 501)); // expired due to max ttl + } + + @Test + public void hintIsLive() throws Exception + { + Assert.assertTrue(liveHint.isLive()); + Assert.assertFalse(ttldHint.isLive()); + } + + @Test + public void hintIterator() throws Exception + { + List<Hint> hints = new ArrayList<>(); + try (HintsReader reader = HintsReader.open(hintFile)) + { + for (HintsReader.Page page: reader) + { + Iterator<Hint> iter = page.hintsIterator(); + while (iter.hasNext()) + { + hints.add(iter.next()); + } + } + } + + Assert.assertEquals(1, hints.size()); + Assert.assertEquals(hintKey(liveHint), hintKey(hints.get(0))); + } + + @Test + public void bufferIterator() throws Exception + { + List<Hint> hints = new ArrayList<>(); + try (HintsReader reader = HintsReader.open(hintFile)) + { + for (HintsReader.Page page: reader) + { + Iterator<ByteBuffer> iter = page.buffersIterator(); + while (iter.hasNext()) + { + hints.add(deserialize(iter.next())); + } + } + } + + Assert.assertEquals(1, hints.size()); + Assert.assertEquals(hintKey(liveHint), hintKey(hints.get(0))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
