Repository: cassandra Updated Branches: refs/heads/trunk a8acb2a1b -> 31501cc8b
Add mutation size and batch metrics patch by Alwyn Davis; reviewed by Benjamin Lerer for CASSANDRA-12649 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b84de4d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b84de4d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b84de4d Branch: refs/heads/trunk Commit: 3b84de4da4243eaf5d1353c7154fb22c866eff7b Parents: 8883554 Author: Alwyn Davis <[email protected]> Authored: Mon Dec 12 14:05:39 2016 +0100 Committer: Benjamin Lerer <[email protected]> Committed: Mon Dec 12 14:05:39 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 +- doc/source/operating/metrics.rst | 25 ++++ .../cql3/statements/BatchStatement.java | 35 ++++-- src/java/org/apache/cassandra/db/IMutation.java | 16 +++ .../apache/cassandra/metrics/BatchMetrics.java | 38 ++++++ .../metrics/CASClientRequestMetrics.java | 5 - .../metrics/CASClientWriteRequestMetrics.java | 52 ++++++++ .../metrics/ClientWriteRequestMetrics.java | 47 ++++++++ .../apache/cassandra/service/StorageProxy.java | 16 ++- .../cassandra/metrics/BatchMetricsTest.java | 119 +++++++++++++++++++ 10 files changed, 336 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e41b2b..dfb849d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.12 - * add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) + * Add mutation size and batch metrics (CASSANDRA-12649) + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) * Fix primary index calculation for SASI (CASSANDRA-12910) * Expose time spent waiting in thread pool queue (CASSANDRA-8398) * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/doc/source/operating/metrics.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst index 546d9c2..ef43128 100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@ -249,6 +249,7 @@ Reported name format: UnfinishedCommit Counter Number of transactions that were committed on write. ConditionNotMet Counter Number of transaction preconditions did not match current values. ContentionHistogram Histogram How many contended writes were encountered + MutationSizeHistogram Histogram Total size in bytes of the requests mutations. ===================== ============== ============================================================= @@ -286,6 +287,7 @@ Reported name format: Failures Counter Number of write failures encountered. |nbsp| Latency Write latency. Unavailables Counter Number of unavailable exceptions encountered. + MutationSizeHistogram Histogram Total size in bytes of the requests mutations. ===================== ============== ============================================================= @@ -585,6 +587,29 @@ connectedNativeClients Counter Number of clients connected to this n connectedThriftClients Counter Number of clients connected to this nodes thrift protocol server =========================== ============== =========== + +Batch Metrics +^^^^^^^^^^^^^ + +Metrics specifc to batch statements. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Batch.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Batch name=<MetricName>`` + +=========================== ============== =========== +Name Type Description +=========================== ============== =========== +PartitionsPerCounterBatch Histogram Distribution of the number of partitions processed per counter batch +PartitionsPerLoggedBatch Histogram Distribution of the number of partitions processed per logged batch +PartitionsPerUnloggedBatch Histogram Distribution of the number of partitions processed per unlogged batch +=========================== ============== =========== + + JVM Metrics ^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 83a8324..60a8df5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.metrics.BatchMetrics; import org.apache.cassandra.service.*; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; @@ -79,6 +80,8 @@ public class BatchStatement implements CQLStatement "tables involved in an atomic batch might cause batchlog " + "entries to expire before being replayed."; + public static final BatchMetrics metrics = new BatchMetrics(); + /** * Creates a new BatchStatement from a list of statements and a * Thrift consistency level. @@ -259,7 +262,7 @@ public class BatchStatement implements CQLStatement /** * Checks batch size to ensure threshold is met. If not, a warning is logged. * - * @param updates - the batch mutations. + * @param mutations - the batch mutations. */ private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException { @@ -267,14 +270,8 @@ public class BatchStatement implements CQLStatement if (mutations.size() <= 1) return; - long size = 0; long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold(); - - for (IMutation mutation : mutations) - { - for (PartitionUpdate update : mutation.getPartitionUpdates()) - size += update.dataSize(); - } + long size = IMutation.dataSize(mutations); if (size > warnThreshold) { @@ -369,10 +366,23 @@ public class BatchStatement implements CQLStatement verifyBatchSize(mutations); verifyBatchType(mutations); + updatePartitionsPerBatchMetrics(mutations.size()); + boolean mutateAtomic = (isLogged() && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime); } + private void updatePartitionsPerBatchMetrics(int updatedPartitions) + { + if (isLogged()) { + metrics.partitionsPerLoggedBatch.update(updatedPartitions); + } else if (isCounter()) { + metrics.partitionsPerCounterBatch.update(updatedPartitions); + } else { + metrics.partitionsPerUnloggedBatch.update(updatedPartitions); + } + } + private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { @@ -392,11 +402,16 @@ public class BatchStatement implements CQLStatement state.getClientState(), queryStartNanoTime)) { - return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0))); + + return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, + tableName, + result, + columnsWithConditions, + true, + options.forStatement(0))); } } - private Pair<CQL3CasRequest,Set<ColumnDefinition>> makeCasRequest(BatchQueryOptions options, QueryState state) { long now = state.getTimestamp(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/db/IMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index c734e16..0ac89f7 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -31,4 +31,20 @@ public interface IMutation public long getTimeout(); public String toString(boolean shallow); public Collection<PartitionUpdate> getPartitionUpdates(); + + /** + * Computes the total data size of the specified mutations. + * @param mutations the mutations + * @return the total data size of the specified mutations + */ + public static long dataSize(Collection<? extends IMutation> mutations) + { + long size = 0; + for (IMutation mutation : mutations) + { + for (PartitionUpdate update : mutation.getPartitionUpdates()) + size += update.dataSize(); + } + return size; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/BatchMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/BatchMetrics.java b/src/java/org/apache/cassandra/metrics/BatchMetrics.java new file mode 100644 index 0000000..9bea162 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/BatchMetrics.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Histogram; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class BatchMetrics +{ + private static final MetricNameFactory factory = new DefaultNameFactory("Batch"); + + public final Histogram partitionsPerLoggedBatch; + public final Histogram partitionsPerUnloggedBatch; + public final Histogram partitionsPerCounterBatch; + + public BatchMetrics() + { + partitionsPerLoggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerLoggedBatch"), false); + partitionsPerUnloggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerUnloggedBatch"), false); + partitionsPerCounterBatch = Metrics.histogram(factory.createMetricName("PartitionsPerCounterBatch"), false); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java index f3f1f64..9884ff1 100644 --- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java @@ -23,12 +23,9 @@ import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; - public class CASClientRequestMetrics extends ClientRequestMetrics { public final Histogram contention; - /* Used only for write */ - public final Counter conditionNotMet; public final Counter unfinishedCommit; @@ -36,7 +33,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics { super(scope); contention = Metrics.histogram(factory.createMetricName("ContentionHistogram"), false); - conditionNotMet = Metrics.counter(factory.createMetricName("ConditionNotMet")); unfinishedCommit = Metrics.counter(factory.createMetricName("UnfinishedCommit")); } @@ -44,7 +40,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics { super.release(); Metrics.remove(factory.createMetricName("ContentionHistogram")); - Metrics.remove(factory.createMetricName("ConditionNotMet")); Metrics.remove(factory.createMetricName("UnfinishedCommit")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java new file mode 100644 index 0000000..5971074 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +/** + * Metrics for tracking information about CAS write requests. + * + */ +public class CASClientWriteRequestMetrics extends CASClientRequestMetrics +{ + /** + * Metric for tracking the mutation sizes in bytes. + */ + public final Histogram mutationSize; + + public final Counter conditionNotMet; + + public CASClientWriteRequestMetrics(String scope) + { + super(scope); + mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"), false); + conditionNotMet = Metrics.counter(factory.createMetricName("ConditionNotMet")); + } + + public void release() + { + super.release(); + Metrics.remove(factory.createMetricName("ConditionNotMet")); + Metrics.remove(factory.createMetricName("MutationSizeHistogram")); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java new file mode 100644 index 0000000..50427af --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Histogram; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +/** + * Metrics for tracking information about write requests. + * + */ +public class ClientWriteRequestMetrics extends ClientRequestMetrics +{ + /** + * Metric for tracking the mutation sizes in bytes. + */ + public final Histogram mutationSize; + + public ClientWriteRequestMetrics(String scope) + { + super(scope); + mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"), false); + } + + public void release() + { + super.release(); + Metrics.remove(factory.createMetricName("MutationSizeHistogram")); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e0be68c..7d77bd4 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -99,12 +99,12 @@ public class StorageProxy implements StorageProxyMBean }; private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read"); private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice"); - private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); - private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); + private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write"); + private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite"); private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite"); private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class); - private static final Map<ConsistencyLevel, ClientRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class); + private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class); private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -175,7 +175,7 @@ public class StorageProxy implements StorageProxyMBean for(ConsistencyLevel level : ConsistencyLevel.values()) { readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name())); - writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + level.name())); + writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" + level.name())); } } @@ -273,6 +273,10 @@ public class StorageProxy implements StorageProxyMBean // TODO turn null updates into delete? PartitionUpdate updates = request.makeUpdates(current); + long size = updates.dataSize(); + casWriteMetrics.mutationSize.update(size); + writeMetricsMap.get(consistencyForPaxos).mutationSize.update(size); + // Apply triggers to cas updates. A consideration here is that // triggers emit Mutations, and so a given trigger implementation // may generate mutations for partitions other than the one this @@ -859,6 +863,10 @@ public class StorageProxy implements StorageProxyMBean .viewManager .updatesAffectView(mutations, true); + long size = IMutation.dataSize(mutations); + writeMetrics.mutationSize.update(size); + writeMetricsMap.get(consistencyLevel).mutationSize.update(size); + if (augmented != null) mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java new file mode 100644 index 0000000..60ee725 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.io.IOException; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.EmbeddedCassandraService; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.apache.cassandra.cql3.statements.BatchStatement.metrics; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class BatchMetricsTest extends SchemaLoader +{ + private static EmbeddedCassandraService cassandra; + + private static Cluster cluster; + private static Session session; + + private static String KEYSPACE = "junit"; + private static final String TABLE = "batchmetricstest"; + + private static PreparedStatement ps; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + Schema.instance.clear(); + + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"); + session.execute("USE " + KEYSPACE); + session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int PRIMARY KEY, val text);"); + + ps = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, val) VALUES (?, ?);"); + } + + private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition) + { + BatchStatement.Type batchType; + + if (isLogged) { + batchType = BatchStatement.Type.LOGGED; + } else { + batchType = BatchStatement.Type.UNLOGGED; + } + + BatchStatement batch = new BatchStatement(batchType); + + for (int i=0; i<distinctPartitions; i++) { + for (int j=0; j<statementsPerPartition; j++) { + batch.add(ps.bind(i, "aaaaaaaa")); + } + } + + session.execute(batch); + } + + @Test + public void testLoggedPartitionsPerBatch() { + int partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount(); + executeBatch(true, 10, 2); + assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount()); + assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax()); // decayingBuckets may not have exact value + + partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount(); + executeBatch(true, 21, 2); + assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount()); + assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax()); + } + + @Test + public void testUnloggedPartitionsPerBatch() { + int partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount(); + executeBatch(false, 7, 2); + assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount()); + assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax()); + + partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount(); + executeBatch(false, 25, 2); + assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount()); + assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax()); + } +}
