[
https://issues.apache.org/jira/browse/CASSANDRA-9754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566748#comment-15566748
]
Michael Kjellman edited comment on CASSANDRA-9754 at 10/11/16 10:18 PM:
------------------------------------------------------------------------
Attaching an initial set of very rough graphs showing the last 12 hours of
stress/performance testing that's been running. I apologize ahead of time for
some of the graphs -- I wanted to include the average, p99.9th, and count for
all key metrics and in some cases some of the values overlapped and my graphing
foo wasn't good enough to improve the readability. I'll take another pass when
I get some time with the next round of performance testing. The "large" CQL
partitions in all 3 clusters are currently (and during the duration of the
test) between ~6GB and ~12.5GB, although I'm planning on running the
stress/performance tests in all 3 clusters until the "large" CQL partitions
hits ~50GB. The load was started in all 3 clusters (where all 3 were totally
empty at start) at the same time -- from the same stress tool code that I wrote
specifically to realistically test Birch as after repeated attempts to generate
a good workload with cassandra-stress I gave up. Some details of the stress
tool and load that was being generated for these graphs is below.
h3. There are three read-write workloads being run to generate the load during
these tests.
I wrote the following two methods for my "simple-cassandra-stress" tool I threw
together to generate keys that the worker-threads operate on. I'll refer to
them below in terms of how the stress load is currently being generated.
{code:java}
public static List<HashCode> generateRandomKeys(int number) {
List<HashCode> keysToOperateOn = new ArrayList<>();
HashFunction hf = Hashing.murmur3_128();
for (int i = 0; i < number; i++) {
HashCode hashedKey =
hf.newHasher().putLong(RANDOM_THREAD_LOCAL.get().nextInt(300000) + 1).hash();
keysToOperateOn.add(hashedKey);
}
return keysToOperateOn;
}
public static List<HashCode> generateEvenlySpacedPredictableKeys(int number,
int offset,
String seed,
Cluster cluster) throws InvalidParameterException {
Set<TokenRange> tokenRanges = cluster.getMetadata().getTokenRanges();
int numberOfKeysToGenerate = (number < tokenRanges.size()) ?
tokenRanges.size() : number;
Long[] tokens = new Long[numberOfKeysToGenerate];
int pos = 0;
int numberOfSplits = (number <= tokenRanges.size()) ? 1 : (number /
tokenRanges.size()) + 1;
for (TokenRange tokenRange : tokenRanges) {
for (TokenRange splitTokenRange :
tokenRange.splitEvenly(numberOfSplits)) {
if (pos >= tokens.length)
break;
tokens[pos++] = (Long) splitTokenRange.getStart().getValue();
}
if (pos >= tokens.length)
break;
}
HashCode[] randomKeys = new HashCode[tokens.length];
int pendingRandomKeys = tokens.length;
while (pendingRandomKeys > 0) {
for (int i = offset; i < (offset + numberOfKeysToGenerate) * (number *
10); i++) {
if (pendingRandomKeys <= 0)
break;
HashFunction hf = Hashing.murmur3_128();
HashCode hashedKey = hf.newHasher().putString(seed,
Charset.defaultCharset()).putInt(i).hash();
for (int t = 0; t < tokens.length; t++) {
if ((t + 1 == tokens.length && hashedKey.asLong() >= tokens[t])
|| (hashedKey.asLong() >= tokens[t] && hashedKey.asLong() < tokens[t + 1])) {
if (randomKeys[t] == null) {
randomKeys[t] = hashedKey;
pendingRandomKeys--;
}
break;
}
}
}
}
return Arrays.asList(randomKeys);
}
{code}
There are 12 Cassandra instances in each performance/stress cluster running JDK
1.8_u74 with the CMS collector (obviously simplified) running with -Xms5G
-Xmx5G -Xmn1G.
The test keyspace is created with RF=3:
{code:SQL}
CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class':
'NetworkTopologyStrategy', 'datacenter1': 3}
{code}
Operations for test_keyspace.largeuuid1 generate a new key to insert and read
from at the top of every iteration with generateRandomKeys(1). Each worker then
generates 10,000 random mutations, with the current timeuuid and a random value
blob of 30 bytes to 2kb. This is intended to get some more "normal" load on the
cluster.
{code:SQL}
CREATE TABLE IF NOT EXISTS test_keyspace.timeuuid1 (name text, col1 timeuuid,
value blob, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
"INSERT INTO test_keyspace.largeuuid1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.largeuuid1 WHERE name = ? and col1 = ?"
{code}
The second and third generated workload attempt to stress the large row size
element of this work. The goal here is to create infinitely growing partitions.
test_keyspace.largetext1 and test_keyspace.largeuuid1 are largely the same
except that test_keyspace.largetext1 is intended to also stress the Overflow
logic for large composite keys. A key design element of Birch is it's support
for variable length keys. Cassandra supports row keys up to a maximum length of
unsigned short. To have predictable performance in the tree implementation
however, supporting keys of length unsigned short as first class citizens would
aversely hurt the performance of the 99.999% of other normal sized keys. To
support these large keys (but not hurt the performance of normal sized keys) a
Birch node/leaf will contain up to ((size_of_leaf_node / 2) / 2), where
size_of_leaf_node is 4kb by default and we divide by 2 to accommodate for
serializing/inserting at least 2 elements in a single node. This results in a
key of length <= 1kb being supported without any special handling which should
cover the use cases of almost everyone in the world.
For keys that exceed that length, the rest of the bytes are written into a
single Overflow page which is shared between all inner + leaf nodes and is not
page aligned. This means we will keep 1kb worth of the key (assuming a 4kb
Birch node size) inside the node itself and the rest in the Overflow page. If
we need to read that key we can grab the bytes from the node + overflow page
inline during the tree operation and re-assemble the entire variable key. This
has a slight performance cost (of course) as it requires the allocation of an
additional byte[], an additional seek, and additional reads.
To exercise this, col1 in test_keyspace.largetext1 is a randomly generated
string from 300-4kb -- and conversely to see the performance *without* the
Overflow logic (what will almost always be the case in real life as row keys >
1kb are pretty ridiculous ;) ) test_keyspace.largeuuid1 uses a simple randomly
generated UUID for it's primary key.
generateEvenlySpacedPredictableKeys() (see above) was written to generate a
predicable set of pseudo-random keys (where the same seed will generate the
same "random" keys). The logic is a bit complicated as I found that just
randomly generating n-keys didn't guarantee the load would be evenly
distributed across the ring and a disproportionate number of the randomly
generated keys would land on a few instances. The goal here is to generate an
even number of keys that can be re-used even between launches of the stress
tool itself to generate "infinitely" wide/large CQL partitions!
{code:SQL}
CREATE TABLE IF NOT EXISTS test_keyspace.largetext1 (name text, col1 text,
value blob, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
CREATE TABLE IF NOT EXISTS test_keyspace.largeuuid1 (name text, col1 uuid,
value uuid, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
"INSERT INTO test_keyspace.timeuuid1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.timeuuid1 WHERE name = ? and col1 = ?"
"INSERT INTO test_keyspace.largetext1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.largetext1 WHERE name = ? and col1 = ?"
{code}
The values that are generated for insert are generated lazily to allow us to
insert large amounts of data without incurring impossible memory and CPU costs
on the client/stress-tool side to attempt to generate them all up front (which
is what attempting to configure a large partition with cassandra-stress will do
and fail at). I then sample the randomly generated values per iterator at a
given rate so that once I'm done inserting enough data to do a best effort at
ensuring the memtable has been flushed (and so the read will come from the disk
not the memtable) I can then iterate thru the samples and select those values
and validate that the database is returning the same thing I know I inserted
(to ensure replacing such a critical part of Cassandra's storage engine hasn't
broken correctness -- which is a paramount requirement above everything
obviously).
h2. Now, Some Graphs!
It's very easy to see the difference between the Birch and non-Birch (control)
clusters. With Birch the read and write latencies are consistent, irregardless
of the size of the CQL partitions that are being written and read from. GC
counts are very low and when GC does run it's very short ParNew runs, not long
STW CMS collections.
In comparison, the control cluster without Birch shows a upward trend in
latencies as the CQL partition size continues to grow. GC is very unpredictable
with many (in terms of count) regular (and long in terms of duration) 200-300ms
STW CMS pauses. Instances were also starting to frequently OOM while I was
collecting statistics. This makes it hard to get good comparison data as the
latencies and counts that the cluster can drive aren't predictable at all
between instances restarting and randomly pausing for very extended lengths of
time.
h3. Read, Write Counts and Latencies, and Java JVM GC Statistics with Birch
!perf_cluster_1_with_birch_read_latency_and_counts.png!
!perf_cluster_1_with_birch_write_latency_and_counts.png!
!perf_cluster_2_with_birch_read_latency_and_counts.png!
!perf_cluster_2_with_birch_write_latency_and_counts.png!
!gc_collection_times_with_birch.png!
!gc_counts_with_birch.png!
----
h3. Read, Write Counts and Latencies, and Java JVM GC Statistics without Birch
!perf_cluster_3_without_birch_read_latency_and_counts.png|thumbnail!
!perf_cluster_3_without_birch_write_latency_and_counts.png|thumbnail!
!gc_collection_times_without_birch.png|thumbnail!
!gc_counts_without_birch.png|thumbnail!
was (Author: mkjellman):
Attaching an initial set of very rough graphs showing the last 12 hours of
stress/performance testing that's been running. I apologize ahead of time for
some of the graphs -- I wanted to include the average, p99.9th, and count for
all key metrics and in some cases some of the values overlapped and my graphing
foo wasn't good enough to improve the readability. I'll take another pass when
I get some time with the next round of performance testing. The "large" CQL
partitions in all 3 clusters are currently (and during the duration of the
test) between ~6GB and ~12.5GB, although I'm planning on running the
stress/performance tests in all 3 clusters until the "large" CQL partitions
hits ~50GB. The load was started in all 3 clusters (where all 3 were totally
empty at start) at the same time -- from the same stress tool code that I wrote
specifically to realistically test Birch as after repeated attempts to generate
a good workload with cassandra-stress I gave up. Some details of the stress
tool and load that was being generated for these graphs is below.
h3. There are three read-write workloads being run to generate the load during
these tests.
I wrote the following two methods for my "simple-cassandra-stress" tool I threw
together to generate keys that the worker-threads operate on. I'll refer to
them below in terms of how the stress load is currently being generated.
{code:java}
public static List<HashCode> generateRandomKeys(int number) {
List<HashCode> keysToOperateOn = new ArrayList<>();
HashFunction hf = Hashing.murmur3_128();
for (int i = 0; i < number; i++) {
HashCode hashedKey =
hf.newHasher().putLong(RANDOM_THREAD_LOCAL.get().nextInt(300000) + 1).hash();
keysToOperateOn.add(hashedKey);
}
return keysToOperateOn;
}
public static List<HashCode> generateEvenlySpacedPredictableKeys(int number,
int offset,
String seed,
Cluster cluster) throws InvalidParameterException {
Set<TokenRange> tokenRanges = cluster.getMetadata().getTokenRanges();
int numberOfKeysToGenerate = (number < tokenRanges.size()) ?
tokenRanges.size() : number;
Long[] tokens = new Long[numberOfKeysToGenerate];
int pos = 0;
int numberOfSplits = (number <= tokenRanges.size()) ? 1 : (number /
tokenRanges.size()) + 1;
for (TokenRange tokenRange : tokenRanges) {
for (TokenRange splitTokenRange :
tokenRange.splitEvenly(numberOfSplits)) {
if (pos >= tokens.length)
break;
tokens[pos++] = (Long) splitTokenRange.getStart().getValue();
}
if (pos >= tokens.length)
break;
}
HashCode[] randomKeys = new HashCode[tokens.length];
int pendingRandomKeys = tokens.length;
while (pendingRandomKeys > 0) {
for (int i = offset; i < (offset + numberOfKeysToGenerate) * (number *
10); i++) {
if (pendingRandomKeys <= 0)
break;
HashFunction hf = Hashing.murmur3_128();
HashCode hashedKey = hf.newHasher().putString(seed,
Charset.defaultCharset()).putInt(i).hash();
for (int t = 0; t < tokens.length; t++) {
if ((t + 1 == tokens.length && hashedKey.asLong() >= tokens[t])
|| (hashedKey.asLong() >= tokens[t] && hashedKey.asLong() < tokens[t + 1])) {
if (randomKeys[t] == null) {
randomKeys[t] = hashedKey;
pendingRandomKeys--;
}
break;
}
}
}
}
return Arrays.asList(randomKeys);
}
{code}
There are 12 Cassandra instances in each performance/stress cluster running JDK
1.8_u74 with the CMS collector (obviously simplified) running with -Xms5G
-Xmx5G -Xmn1G.
The test keyspace is created with RF=3:
{code:SQL}
CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class':
'NetworkTopologyStrategy', 'datacenter1': 3}
{code}
Operations for test_keyspace.largeuuid1 generate a new key to insert and read
from at the top of every iteration with generateRandomKeys(1). Each worker then
generates 10,000 random mutations, with the current timeuuid and a random value
blob of 30 bytes to 2kb. This is intended to get some more "normal" load on the
cluster.
{code:SQL}
CREATE TABLE IF NOT EXISTS test_keyspace.timeuuid1 (name text, col1 timeuuid,
value blob, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
"INSERT INTO test_keyspace.largeuuid1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.largeuuid1 WHERE name = ? and col1 = ?"
{code}
The second and third generated workload attempt to stress the large row size
element of this work. The goal here is to create infinitely growing partitions.
test_keyspace.largetext1 and test_keyspace.largeuuid1 are largely the same
except that test_keyspace.largetext1 is intended to also stress the Overflow
logic for large composite keys. A key design element of Birch is it's support
for variable length keys. Cassandra supports row keys up to a maximum length of
unsigned short. To have predictable performance in the tree implementation
however, supporting keys of length unsigned short as first class citizens would
aversely hurt the performance of the 99.999% of other normal sized keys. To
support these large keys (but not hurt the performance of normal sized keys) a
Birch node/leaf will contain up to ((size_of_leaf_node / 2) / 2), where
size_of_leaf_node is 4kb by default and we divide by 2 to accommodate for
serializing/inserting at least 2 elements in a single node. This results in a
key of length <= 1kb being supported without any special handling which should
cover the use cases of almost everyone in the world.
For keys that exceed that length, the rest of the bytes are written into a
single Overflow page which is shared between all inner + leaf nodes and is not
page aligned. This means we will keep 1kb worth of the key (assuming a 4kb
Birch node size) inside the node itself and the rest in the Overflow page. If
we need to read that key we can grab the bytes from the node + overflow page
inline during the tree operation and re-assemble the entire variable key. This
has a slight performance cost (of course) as it requires the allocation of an
additional byte[], an additional seek, and additional reads.
To exercise this, col1 in test_keyspace.largetext1 is a randomly generated
string from 300-4kb -- and conversely to see the performance *without* the
Overflow logic (what will almost always be the case in real life as row keys >
1kb are pretty ridiculous ;) ) test_keyspace.largeuuid1 uses a simple randomly
generated UUID for it's primary key.
generateEvenlySpacedPredictableKeys() (see above) was written to generate a
predicable set of pseudo-random keys (where the same seed will generate the
same "random" keys). The logic is a bit complicated as I found that just
randomly generating n-keys didn't guarantee the load would be evenly
distributed across the ring and a disproportionate number of the randomly
generated keys would land on a few instances. The goal here is to generate an
even number of keys that can be re-used even between launches of the stress
tool itself to generate "infinitely" wide/large CQL partitions!
{code:SQL}
CREATE TABLE IF NOT EXISTS test_keyspace.largetext1 (name text, col1 text,
value blob, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
CREATE TABLE IF NOT EXISTS test_keyspace.largeuuid1 (name text, col1 uuid,
value uuid, primary key(name, col1)) WITH compaction = {
'class':'LeveledCompactionStrategy' }
"INSERT INTO test_keyspace.timeuuid1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.timeuuid1 WHERE name = ? and col1 = ?"
"INSERT INTO test_keyspace.largetext1 (name, col1, value) VALUES (?, ?, ?)"
"SELECT * FROM test_keyspace.largetext1 WHERE name = ? and col1 = ?"
{code}
The values that are generated for insert are generated lazily to allow us to
insert large amounts of data without incurring impossible memory and CPU costs
on the client/stress-tool side to attempt to generate them all up front (which
is what attempting to configure a large partition with cassandra-stress will do
and fail at). I then sample the randomly generated values per iterator at a
given rate so that once I'm done inserting enough data to do a best effort at
ensuring the memtable has been flushed (and so the read will come from the disk
not the memtable) I can then iterate thru the samples and select those values
and validate that the database is returning the same thing I know I inserted
(to ensure replacing such a critical part of Cassandra's storage engine hasn't
broken correctness -- which is a paramount requirement above everything
obviously).
h2. Now, Some Graphs!
It's very easy to see the difference between the Birch and non-Birch (control)
clusters. With Birch the read and write latencies are consistent, irregardless
of the size of the CQL partitions that are being written and read from. GC
counts are very low and when GC does run it's very short ParNew runs, not long
STW CMS collections.
In comparison, the control cluster without Birch shows a upward trend in
latencies as the CQL partition size continues to grow. GC is very unpredictable
with many (in terms of count) regular (and long in terms of duration) 200-300ms
STW CMS pauses. Instances were also starting to frequently OOM while I was
collecting statistics. This makes it hard to get good comparison data as the
latencies and counts that the cluster can drive aren't predictable at all
between instances restarting and randomly pausing for very extended lengths of
time.
h3. Read, Write Counts and Latencies, and Java JVM GC Statistics with Birch
!perf_cluster_1_with_birch_read_latency_and_counts.png!
!perf_cluster_1_with_birch_write_latency_and_counts.png!
!perf_cluster_2_with_birch_read_latency_and_counts.png!
!perf_cluster_2_with_birch_write_latency_and_counts.png!
!gc_collection_times_with_birch.png!
!gc_counts_with_birch.png!
----
h3. Read, Write Counts and Latencies, and Java JVM GC Statistics without Birch
!perf_cluster_3_without_birch_read_latency_and_counts.png!
!perf_cluster_3_without_birch_write_latency_and_counts.png!
!gc_collection_times_without_birch.png!
!gc_counts_without_birch.png!
> Make index info heap friendly for large CQL partitions
> ------------------------------------------------------
>
> Key: CASSANDRA-9754
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9754
> Project: Cassandra
> Issue Type: Improvement
> Reporter: sankalp kohli
> Assignee: Michael Kjellman
> Priority: Minor
> Fix For: 4.x
>
> Attachments: gc_collection_times_with_birch.png,
> gc_collection_times_without_birch.png, gc_counts_with_birch.png,
> gc_counts_without_birch.png,
> perf_cluster_1_with_birch_read_latency_and_counts.png,
> perf_cluster_1_with_birch_write_latency_and_counts.png,
> perf_cluster_2_with_birch_read_latency_and_counts.png,
> perf_cluster_2_with_birch_write_latency_and_counts.png,
> perf_cluster_3_without_birch_read_latency_and_counts.png,
> perf_cluster_3_without_birch_write_latency_and_counts.png
>
>
> Looking at a heap dump of 2.0 cluster, I found that majority of the objects
> are IndexInfo and its ByteBuffers. This is specially bad in endpoints with
> large CQL partitions. If a CQL partition is say 6,4GB, it will have 100K
> IndexInfo objects and 200K ByteBuffers. This will create a lot of churn for
> GC. Can this be improved by not creating so many objects?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)