This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 247502c updateCoordinatorWriteLatencyTableMetric can produce misleading metrics 247502c is described below commit 247502c5d19c181bbe0a224da3ad6ebd0156f607 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Tue Feb 11 18:41:21 2020 -0800 updateCoordinatorWriteLatencyTableMetric can produce misleading metrics Patch by David Capwell; Reviewed by Blake Eggleston for CASSANDRA-15569 --- CHANGES.txt | 1 + .../org/apache/cassandra/service/StorageProxy.java | 129 ++++++++++++++++----- .../apache/cassandra/metrics/TableMetricsTest.java | 108 +++++++++++++---- 3 files changed, 188 insertions(+), 50 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c4481ae..5fe958c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569) * Added documentation for read repair and an example of full repair (CASSANDRA-15485) * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190) * Added documentation for Full Query Logging (CASSANDRA-15475) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9fc6b52..0caee86 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -19,19 +19,37 @@ package org.apache.cassandra.service; import java.nio.ByteBuffer; import java.nio.file.Paths; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; import com.google.common.cache.CacheLoader; -import com.google.common.collect.*; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Uninterruptibles; - import org.apache.commons.lang3.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,38 +58,97 @@ import org.apache.cassandra.audit.FullQueryLoggerOptions; import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.service.reads.AbstractReadExecutor; -import org.apache.cassandra.service.reads.DataResolver; -import org.apache.cassandra.service.reads.ReadCallback; -import org.apache.cassandra.service.reads.repair.ReadRepair; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.HintedHandOffManager; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.TruncateRequest; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.ViewUtils; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.RingPosition; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.CasWriteTimeoutException; +import org.apache.cassandra.exceptions.CasWriteUnknownResultException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.IsBootstrappingException; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.RequestTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.index.Index; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.metrics.*; -import org.apache.cassandra.net.*; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.locator.Replicas; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.metrics.CASClientRequestMetrics; +import org.apache.cassandra.metrics.CASClientWriteRequestMetrics; +import org.apache.cassandra.metrics.ClientRequestMetrics; +import org.apache.cassandra.metrics.ClientWriteRequestMetrics; +import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.metrics.ViewWriteMetrics; +import org.apache.cassandra.net.ForwardingInfo; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageFlag; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.PrepareCallback; import org.apache.cassandra.service.paxos.ProposeCallback; -import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.reads.AbstractReadExecutor; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; -import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.MonotonicClock; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -980,14 +1057,12 @@ public class StorageProxy implements StorageProxyMBean try { - //TODO: Avoid giving same latency number for each CF in each mutation in a given set of mutations //We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints) //However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric. - mutations.forEach(mutation -> { - mutation.getTableIds().forEach(tableId -> { - Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId).metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS); - }); - }); + mutations.stream() + .flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore(tableId))) + .distinct() + .forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS)); } catch (Exception ex) { diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java index a3ae956..c5434fe 100644 --- a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java @@ -36,8 +36,8 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.EmbeddedCassandraService; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) public class TableMetricsTest extends SchemaLoader @@ -66,15 +66,23 @@ public class TableMetricsTest extends SchemaLoader private ColumnFamilyStore recreateTable() { - session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, TABLE)); - session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, TABLE)); - return ColumnFamilyStore.getIfExists(KEYSPACE, TABLE); + return recreateTable(TABLE); } - private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition) + private ColumnFamilyStore recreateTable(String table) { + session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, table)); + return ColumnFamilyStore.getIfExists(KEYSPACE, table); + } + + private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition, String... tables) + { + if (tables == null || tables.length == 0) + { + tables = new String[] { TABLE }; + } BatchStatement.Type batchType; - PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, TABLE)); if (isLogged) { @@ -87,6 +95,16 @@ public class TableMetricsTest extends SchemaLoader BatchStatement batch = new BatchStatement(batchType); + for (String table : tables) + populateBatch(batch, table, distinctPartitions, statementsPerPartition); + + session.execute(batch); + } + + private static void populateBatch(BatchStatement batch, String table, int distinctPartitions, int statementsPerPartition) + { + PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, table)); + for (int i=0; i<distinctPartitions; i++) { for (int j=0; j<statementsPerPartition; j++) @@ -94,8 +112,6 @@ public class TableMetricsTest extends SchemaLoader batch.add(ps.bind(i, j + "a", "b")); } } - - session.execute(batch); } @Test @@ -103,7 +119,7 @@ public class TableMetricsTest extends SchemaLoader { ColumnFamilyStore cfs = recreateTable(); assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0); for (int i = 0; i < 10; i++) { @@ -111,7 +127,7 @@ public class TableMetricsTest extends SchemaLoader } assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); } @Test @@ -121,7 +137,7 @@ public class TableMetricsTest extends SchemaLoader PreparedStatement metricsStatement = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?)", KEYSPACE, TABLE)); assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0); for (int i = 0; i < 10; i++) { @@ -129,7 +145,7 @@ public class TableMetricsTest extends SchemaLoader } assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); } @Test @@ -137,14 +153,36 @@ public class TableMetricsTest extends SchemaLoader { ColumnFamilyStore cfs = recreateTable(); assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0); executeBatch(true, 10, 2); - assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); + assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount()); executeBatch(true, 20, 2); - assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); + } + + @Test + public void testLoggedPartitionsPerBatchMultiTable() + { + ColumnFamilyStore first = recreateTable(); + assertEquals(0, first.metric.coordinatorWriteLatency.getCount()); + assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0); + + ColumnFamilyStore second = recreateTable(TABLE + "_second"); + assertEquals(0, second.metric.coordinatorWriteLatency.getCount()); + assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0); + + executeBatch(true, 10, 2, TABLE, TABLE + "_second"); + assertEquals(1, first.metric.coordinatorWriteLatency.getCount()); + assertEquals(1, second.metric.coordinatorWriteLatency.getCount()); + + executeBatch(true, 20, 2, TABLE, TABLE + "_second"); + assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0); + assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0); } @Test @@ -152,15 +190,35 @@ public class TableMetricsTest extends SchemaLoader { ColumnFamilyStore cfs = recreateTable(); assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0); executeBatch(false, 5, 3); - assertEquals(5, cfs.metric.coordinatorWriteLatency.getCount()); + assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount()); executeBatch(false, 25, 2); - assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); + } + @Test + public void testUnloggedPartitionsPerBatchMultiTable() + { + ColumnFamilyStore first = recreateTable(); + assertEquals(0, first.metric.coordinatorWriteLatency.getCount()); + assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0); + + ColumnFamilyStore second = recreateTable(TABLE + "_second"); + assertEquals(0, second.metric.coordinatorWriteLatency.getCount()); + assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0); + + executeBatch(false, 5, 3, TABLE, TABLE + "_second"); + assertEquals(1, first.metric.coordinatorWriteLatency.getCount()); + + executeBatch(false, 25, 2, TABLE, TABLE + "_second"); + assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch + assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0); + assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0); } @Test @@ -168,9 +226,13 @@ public class TableMetricsTest extends SchemaLoader { ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, COUNTER_TABLE); assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0); session.execute(String.format("UPDATE %s.%s SET id_c = id_c + 1 WHERE id = 1 AND val = 'val1'", KEYSPACE, COUNTER_TABLE)); assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount()); - assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); + } + + private static void assertGreaterThan(double actual, double expectedLessThan) { + assertTrue("Expected " + actual + " > " + expectedLessThan, actual > expectedLessThan); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org