This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce27008 Invalid serialized size for responses caused by increasing
message time by 1ms which caused extra bytes in size calculation
ce27008 is described below
commit ce270081bc0bc8ffa0a7e1e5c04f30b5c1875a84
Author: Yifan Cai <[email protected]>
AuthorDate: Thu Oct 29 10:03:56 2020 -0700
Invalid serialized size for responses caused by increasing message time by
1ms which caused extra bytes in size calculation
patch by Yifan Cai; reviewed by David Capwell, Jon Meredith for
CASSANDRA-16103
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Message.java | 4 +--
.../unit/org/apache/cassandra/net/FramingTest.java | 36 ++++++++++++++++++++++
.../net/MessageSerializationPropertyTest.java | 3 ++
.../unit/org/apache/cassandra/net/MessageTest.java | 2 +-
.../cassandra/net/OutboundMessageQueueTest.java | 4 +--
.../cassandra/utils/CassandraGenerators.java | 34 +++++++++++++++++++-
.../cassandra/utils/FixedMonotonicClock.java | 8 ++++-
.../apache/cassandra/utils/FreeRunningClock.java | 12 +++++++-
.../org/apache/cassandra/utils/Generators.java | 12 +++++++-
10 files changed, 107 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 01e0ad7..e3b4cde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@
* Show the progress of data streaming and index build (CASSANDRA-15406)
* Add flag to disable chunk cache and disable by default (CASSANDRA-16036)
* Upgrade to snakeyaml >= 1.26 version for CVE-2017-18640 fix
(CASSANDRA-16150)
+ * Invalid serialized size for responses caused by increasing message time by
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
Merged from 3.11:
* Fix ColumnFilter to avoid querying cells of unselected complex columns
(CASSANDRA-15977)
* Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
diff --git a/src/java/org/apache/cassandra/net/Message.java
b/src/java/org/apache/cassandra/net/Message.java
index e0a2e7f..a501730 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -367,8 +367,8 @@ public class Message<T>
this.id = id;
this.verb = verb;
this.from = from;
- this.createdAtNanos = createdAtNanos;
this.expiresAtNanos = expiresAtNanos;
+ this.createdAtNanos = createdAtNanos;
this.flags = flags;
this.params = params;
}
@@ -717,7 +717,7 @@ public class Message<T>
long size = 0;
size += sizeofUnsignedVInt(header.id);
size += CREATION_TIME_SIZE;
- size += sizeofUnsignedVInt(1 +
NANOSECONDS.toMillis(header.expiresAtNanos - header.createdAtNanos));
+ size +=
sizeofUnsignedVInt(NANOSECONDS.toMillis(header.expiresAtNanos -
header.createdAtNanos));
size += sizeofUnsignedVInt(header.verb.id);
size += sizeofUnsignedVInt(header.flags);
size += serializedParamsSize(header.params, version);
diff --git a/test/unit/org/apache/cassandra/net/FramingTest.java
b/test/unit/org/apache/cassandra/net/FramingTest.java
index 27c8003..b79c90c 100644
--- a/test/unit/org/apache/cassandra/net/FramingTest.java
+++ b/test/unit/org/apache/cassandra/net/FramingTest.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -38,9 +40,12 @@ import io.netty.buffer.ByteBuf;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.BufferPools;
import org.apache.cassandra.utils.vint.VIntCoding;
@@ -213,6 +218,37 @@ public class FramingTest
burnRandomLegacy(1000);
}
+ @Test
+ public void testSerializeSizeMatchesEdgeCases() // See CASSANDRA-16103
+ {
+ int v40 = MessagingService.Version.VERSION_40.value;
+ Consumer<Long> subTest = timeGapInMillis ->
+ {
+ long createdAt = 0;
+ long expiresAt = createdAt +
TimeUnit.MILLISECONDS.toNanos(timeGapInMillis);
+ Message<NoPayload> message = Message.builder(Verb.READ_REPAIR_RSP,
NoPayload.noPayload)
+
.from(FBUtilities.getBroadcastAddressAndPort())
+ .withCreatedAt(createdAt)
+ .withExpiresAt(expiresAt)
+ .build();
+
+ try (DataOutputBuffer out = new DataOutputBuffer(20))
+ {
+ Message.serializer.serialize(message, out, v40);
+ Assert.assertEquals(message.serializedSize(v40),
out.getLength());
+ }
+ catch (IOException ioe)
+ {
+ Assert.fail("Unexpected IOEception during test. " +
ioe.getMessage());
+ }
+ };
+
+ // test cases
+ subTest.accept(-1L);
+ subTest.accept(1L << 7 - 1);
+ subTest.accept(1L << 14 - 1);
+ }
+
private void burnRandomLegacy(int count)
{
SecureRandom seed = new SecureRandom();
diff --git
a/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
b/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
index 3cde6f9..5a73cbd 100644
--- a/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
@@ -101,6 +101,9 @@ public class MessageSerializationPropertyTest implements
Serializable
first.clear();
second.clear();
+ // sync the clock with the generated createdAtNanos
+
FixedMonotonicClock.setNowInNanos(message.createdAtNanos());
+
serializer.serialize(message, first, version.value);
Message<Object> read = serializer.deserialize(new
DataInputBuffer(first.buffer(), true),
FBUtilities.getBroadcastAddressAndPort(), version.value);
serializer.serialize(read, second, version.value);
diff --git a/test/unit/org/apache/cassandra/net/MessageTest.java
b/test/unit/org/apache/cassandra/net/MessageTest.java
index 6a9d23f..f32219c 100644
--- a/test/unit/org/apache/cassandra/net/MessageTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.tracing.Tracing.TraceType;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
+import org.assertj.core.api.Assertions;
import static org.apache.cassandra.net.Message.serializer;
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
diff --git a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
index 860e4f1..de64a26 100644
--- a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
@@ -95,7 +95,7 @@ public class OutboundMessageQueueTest
@Test
public void testExpirationOnIteration()
{
- FreeRunningClock clock = new FreeRunningClock();
+ FreeRunningClock clock = new FreeRunningClock(approxTime.now());
List<Message> expiredMessages = new LinkedList<>();
long startTime = clock.now();
@@ -170,7 +170,7 @@ public class OutboundMessageQueueTest
@Test
public void testExpirationOnAdd()
{
- FreeRunningClock clock = new FreeRunningClock();
+ FreeRunningClock clock = new FreeRunningClock(approxTime.now());
List<Message> expiredMessages = new LinkedList<>();
long startTime = clock.now();
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
index 9e71e62..4d51f5c 100644
--- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.utils;
import java.lang.reflect.Modifier;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -52,6 +53,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.PingRequest;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -67,6 +69,9 @@ import org.quicktheories.impl.Constraint;
import static org.apache.cassandra.utils.AbstractTypeGenerators.allowReversed;
import static org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
import static org.apache.cassandra.utils.Generators.IDENTIFIER_GEN;
+import static org.apache.cassandra.utils.Generators.SMALL_TIME_SPAN_NANOS;
+import static org.apache.cassandra.utils.Generators.TIMESTAMP_NANOS;
+import static org.apache.cassandra.utils.Generators.TINY_TIME_SPAN_NANOS;
public final class CassandraGenerators
{
@@ -74,8 +79,14 @@ public final class CassandraGenerators
// utility generators for creating more complex types
private static final Gen<Integer> SMALL_POSITIVE_SIZE_GEN =
SourceDSL.integers().between(1, 30);
+ private static final Gen<Integer> NETWORK_PORT_GEN =
SourceDSL.integers().between(0, 0xFFFF);
private static final Gen<Boolean> BOOLEAN_GEN = SourceDSL.booleans().all();
+ public static final Gen<InetAddressAndPort> INET_ADDRESS_AND_PORT_GEN =
rnd -> {
+ InetAddress address = Generators.INET_ADDRESS_GEN.generate(rnd);
+ return InetAddressAndPort.getByAddressOverrideDefaults(address,
NETWORK_PORT_GEN.generate(rnd));
+ };
+
private static final Gen<IPartitioner> PARTITIONER_GEN =
SourceDSL.arbitrary().pick(Murmur3Partitioner.instance,
ByteOrderedPartitioner.instance,
new LocalPartitioner(TimeUUIDType.instance),
@@ -105,8 +116,29 @@ public final class CassandraGenerators
.<Message<? extends ReadCommand>>map(c ->
Message.builder(Verb.READ_REQ, c).build())
.describedAs(CassandraGenerators::toStringRecursive);
+ private static Gen<Message<NoPayload>> responseGen(Verb verb)
+ {
+ return gen(rnd -> {
+ long timeSpan = SMALL_TIME_SPAN_NANOS.generate(rnd);
+ long delay = TINY_TIME_SPAN_NANOS.generate(rnd); // network &
processing delay
+ long requestCreatedAt = TIMESTAMP_NANOS.generate(rnd);
+ long createdAt = requestCreatedAt + delay;
+ long expiresAt = requestCreatedAt + timeSpan;
+ return Message.builder(verb, NoPayload.noPayload)
+ .withCreatedAt(createdAt)
+ .withExpiresAt(expiresAt)
+ .from(INET_ADDRESS_AND_PORT_GEN.generate(rnd))
+ .build();
+ }).describedAs(CassandraGenerators::toStringRecursive);
+ }
+
+ public static final Gen<Message<NoPayload>> MUTATION_RSP_GEN =
responseGen(Verb.MUTATION_RSP);
+ public static final Gen<Message<NoPayload>> READ_REPAIR_RSP_GEN =
responseGen(Verb.READ_REPAIR_RSP);
+
public static final Gen<Message<?>> MESSAGE_GEN =
Generate.oneOf(cast(MESSAGE_PING_GEN),
-
cast(MESSAGE_READ_COMMAND_GEN))
+
cast(MESSAGE_READ_COMMAND_GEN),
+
cast(MUTATION_RSP_GEN),
+
cast(READ_REPAIR_RSP_GEN))
.describedAs(CassandraGenerators::toStringRecursive);
private CassandraGenerators()
diff --git a/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
b/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
index 7753321..2c68450 100644
--- a/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
+++ b/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
@@ -21,9 +21,15 @@ import java.util.concurrent.TimeUnit;
public final class FixedMonotonicClock implements MonotonicClock
{
+ private static volatile long nowInNanos = 42;
+
+ public static void setNowInNanos(long nowInNanos) {
+ FixedMonotonicClock.nowInNanos = nowInNanos;
+ }
+
public long now()
{
- return 42;
+ return nowInNanos;
}
public long error()
diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
index d853833..4d8a5f6 100644
--- a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
+++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
@@ -25,7 +25,17 @@ import java.util.concurrent.TimeUnit;
*/
public class FreeRunningClock implements MonotonicClock
{
- private long nanoTime = 0;
+ private long nanoTime;
+
+ public FreeRunningClock()
+ {
+ this.nanoTime = 0;
+ }
+
+ public FreeRunningClock(long nanoTime)
+ {
+ this.nanoTime = nanoTime;
+ }
@Override
public long now()
diff --git a/test/unit/org/apache/cassandra/utils/Generators.java
b/test/unit/org/apache/cassandra/utils/Generators.java
index 43d4335..179c0f4 100644
--- a/test/unit/org/apache/cassandra/utils/Generators.java
+++ b/test/unit/org/apache/cassandra/utils/Generators.java
@@ -27,6 +27,7 @@ import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.commons.lang3.ArrayUtils;
@@ -177,21 +178,30 @@ public final class Generators
// all time is boxed in the future around 50 years from today: Aug 20th,
2020 UTC
public static final Gen<Timestamp> TIMESTAMP_GEN;
public static final Gen<Date> DATE_GEN;
+ public static final Gen<Long> TIMESTAMP_NANOS;
+ public static final Gen<Long> SMALL_TIME_SPAN_NANOS; // generate nanos in
[0, 10] seconds
+ public static final Gen<Long> TINY_TIME_SPAN_NANOS; // generate nanos in
[0, 1) seconds
static
{
+ long secondInNanos = 1_000_000_000L;
ZonedDateTime now = ZonedDateTime.of(2020, 8, 20,
0, 0, 0, 0, ZoneOffset.UTC);
ZonedDateTime startOfTime = now.minusYears(50);
ZonedDateTime endOfDays = now.plusYears(50);
Constraint millisConstraint =
Constraint.between(startOfTime.toInstant().toEpochMilli(),
endOfDays.toInstant().toEpochMilli());
- Constraint nanosInSecondConstraint = Constraint.between(0, 999999999);
+ Constraint nanosInSecondConstraint = Constraint.between(0,
secondInNanos - 1);
+ // Represents the timespan based on the most of the default request
timeouts. See DatabaseDescriptor
+ Constraint smallTimeSpanNanosConstraint = Constraint.between(0, 10 *
secondInNanos);
TIMESTAMP_GEN = rnd -> {
Timestamp ts = new Timestamp(rnd.next(millisConstraint));
ts.setNanos((int) rnd.next(nanosInSecondConstraint));
return ts;
};
DATE_GEN = TIMESTAMP_GEN.map(t -> new Date(t.getTime()));
+ TIMESTAMP_NANOS = TIMESTAMP_GEN.map(t ->
TimeUnit.MILLISECONDS.toNanos(t.getTime()) + t.getNanos());
+ SMALL_TIME_SPAN_NANOS = rnd -> rnd.next(smallTimeSpanNanosConstraint);
+ TINY_TIME_SPAN_NANOS = rnd -> rnd.next(nanosInSecondConstraint);
}
private Generators()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]