This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4dae643221a14d19209835eb17a800d681f1b952 Author: Rajan Dhabalia <[email protected]> AuthorDate: Tue Nov 1 12:57:11 2022 -0700 [improve] [client] Add api to get producer/consumer stats for partition topic (#18212) * [improve] [client] Add api to get producer/consumer stats for partition topic * introduce partition topic stats interface --- .../client/api/SimpleProducerConsumerStatTest.java | 61 +++++++++++++++++ .../pulsar/client/api/MultiTopicConsumerStats.java | 39 +++++++++++ .../client/api/PartitionedTopicProducerStats.java | 40 +++++++++++ .../apache/pulsar/client/api/ProducerStats.java | 1 - .../impl/MultiTopicConsumerStatsRecorderImpl.java | 59 ++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 6 +- .../client/impl/PartitionedProducerImpl.java | 9 ++- .../PartitionedTopicProducerStatsRecorderImpl.java | 79 ++++++++++++++++++++++ .../pulsar/client/impl/ProducerStatsDisabled.java | 1 + .../client/impl/ProducerStatsRecorderImpl.java | 17 +---- .../client/impl/ProducerStatsRecorderImplTest.java | 4 +- 11 files changed, 292 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 40d34a7c0b7..1c02f2ea57e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -19,12 +19,14 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -453,4 +455,63 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { .until(() -> producer.getStats().getPendingQueueSize() == numMessages); assertEquals(producer.getStats().getPendingQueueSize(), numMessages); } + + /** + * This test verifies partitioned topic stats for producer and consumer. + * @throws Exception + */ + @Test + public void testPartitionTopicStats() throws Exception { + log.info("-- Starting {} test --", methodName); + + String topicName = "persistent://my-property/my-ns/testPartitionTopicStats"; + int numPartitions = 10; + admin.topics().createPartitionedTopic(topicName, numPartitions); + + ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-subscriber-name"); + + Consumer<byte[]> consumer = consumerBuilder.subscribe(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().enableBatching(false).topic(topicName); + + Producer<byte[]> producer = producerBuilder.create(); + + int numMessages = 20; + for (int i = 0; i < numMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = new HashSet<>(); + for (int i = 0; i < numMessages; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.info("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + + MultiTopicConsumerStats cStat = (MultiTopicConsumerStats) consumer.getStats(); + PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats) producer.getStats(); + retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5, 100); + retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5, 100); + Map<String, ProducerStats> prodStatsMap = pStat.getPartitionStats(); + Map<String, ConsumerStats> consStatsMap = cStat.getPartitionStats(); + assertFalse(prodStatsMap.isEmpty()); + assertFalse(consStatsMap.isEmpty()); + for (int i = 0; i < numPartitions; i++) { + String topic = topicName + "-partition-" + i; + assertTrue(prodStatsMap.containsKey(topic)); + assertTrue(consStatsMap.containsKey(topic)); + } + + consumer.close(); + producer.close(); + + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java new file mode 100644 index 00000000000..e1c1d3372c3 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.Map; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Multi-topic Consumer statistics recorded by client. + * + * <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with + * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute. + */ [email protected] [email protected] +public interface MultiTopicConsumerStats extends ConsumerStats { + + /** + * @return stats for each partition if topic is partitioned topic + */ + Map<String, ConsumerStats> getPartitionStats(); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java new file mode 100644 index 00000000000..5fd3c1f34a0 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.Map; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Partitioned topic Producer statistics recorded by client. + * + * <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with + * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute. + */ [email protected] [email protected] +public interface PartitionedTopicProducerStats extends ProducerStats { + + /** + * @return stats for each partition if topic is partitioned topic + */ + Map<String, ProducerStats> getPartitionStats(); + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java index de028cddab4..30142c70480 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java @@ -115,5 +115,4 @@ public interface ProducerStats extends Serializable { * @return current pending send-message queue size of the producer */ int getPendingQueueSize(); - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java new file mode 100644 index 00000000000..17018be02be --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.MultiTopicConsumerStats; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MultiTopicConsumerStatsRecorderImpl extends ConsumerStatsRecorderImpl implements MultiTopicConsumerStats { + + private static final long serialVersionUID = 1L; + private Map<String, ConsumerStats> partitionStats = new ConcurrentHashMap<>(); + + public MultiTopicConsumerStatsRecorderImpl() { + super(); + } + + public MultiTopicConsumerStatsRecorderImpl(Consumer<?> consumer) { + super(consumer); + } + + public MultiTopicConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, + Consumer<?> consumer) { + super(pulsarClient, conf, consumer); + } + + public void updateCumulativeStats(String partition, ConsumerStats stats) { + super.updateCumulativeStats(stats); + partitionStats.put(partition, stats); + } + + @Override + public Map<String, ConsumerStats> getPartitionStats() { + return partitionStats; + } + + private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 741716919e4..138e8535bca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -94,7 +94,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private volatile Timeout partitionsAutoUpdateTimeout = null; TopicsPartitionChangedListener topicsPartitionChangedListener; CompletableFuture<Void> partitionsAutoUpdateFuture = null; - private final ConsumerStatsRecorder stats; + private final MultiTopicConsumerStatsRecorderImpl stats; private UnAckedMessageTracker unAckedMessageTracker; private final ConsumerConfigurationData<T> internalConfig; @@ -155,7 +155,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { this.internalConfig = getInternalConsumerConfig(); this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 - ? new ConsumerStatsRecorderImpl(this) + ? new MultiTopicConsumerStatsRecorderImpl(this) : null; // start track and auto subscribe partition increment @@ -863,7 +863,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } stats.reset(); - consumers.values().stream().forEach(consumer -> stats.updateCumulativeStats(consumer.getStats())); + consumers.forEach((partition, consumer) -> stats.updateCumulativeStats(partition, consumer.getStats())); return stats; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 4a84ba03ebe..355a5395e74 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -62,7 +62,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers; private final MessageRouter routerPolicy; - private final ProducerStatsRecorderImpl stats; + private final PartitionedTopicProducerStatsRecorderImpl stats; private TopicMetadata topicMetadata; private final int firstPartitionIndex; private String overrideProducerName; @@ -80,7 +80,9 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build(); this.topicMetadata = new TopicMetadataImpl(numPartitions); this.routerPolicy = getMessageRouter(); - stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null; + stats = client.getConfiguration().getStatsIntervalSeconds() > 0 + ? new PartitionedTopicProducerStatsRecorderImpl() + : null; // MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly int maxPendingMessages = Math.min(conf.getMaxPendingMessages(), @@ -353,7 +355,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { return null; } stats.reset(); - producers.values().forEach(p -> stats.updateCumulativeStats(p.getStats())); + producers.forEach( + (partition, producer) -> stats.updateCumulativeStats(producer.getTopic(), producer.getStats())); return stats; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java new file mode 100644 index 00000000000..2f73a6af406 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.DoubleAdder; +import org.apache.pulsar.client.api.PartitionedTopicProducerStats; +import org.apache.pulsar.client.api.ProducerStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsRecorderImpl + implements PartitionedTopicProducerStats { + + private static final long serialVersionUID = 1L; + private Map<String, ProducerStats> partitionStats = Collections.emptyMap(); + private final DoubleAdder sendMsgsRateAggregate; + private final DoubleAdder sendBytesRateAggregate; + private int partitions = 0; + + public PartitionedTopicProducerStatsRecorderImpl() { + super(); + partitionStats = new ConcurrentHashMap<>(); + sendMsgsRateAggregate = new DoubleAdder(); + sendBytesRateAggregate = new DoubleAdder(); + } + + void reset() { + super.reset(); + partitions = 0; + } + + void updateCumulativeStats(String partition, ProducerStats stats) { + super.updateCumulativeStats(stats); + if (stats == null) { + return; + } + partitionStats.put(partition, stats); + // update rates + sendMsgsRateAggregate.add(stats.getSendMsgsRate()); + sendBytesRateAggregate.add(stats.getSendBytesRate()); + partitions++; + } + + @Override + public double getSendMsgsRate() { + return sendMsgsRateAggregate.doubleValue() / partitions; + } + + @Override + public double getSendBytesRate() { + return sendBytesRateAggregate.doubleValue() / partitions; + } + + @Override + public Map<String, ProducerStats> getPartitionStats() { + return partitionStats; + } + + private static final Logger log = LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java index 0962abb1aee..b85fc4cb6b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java @@ -132,4 +132,5 @@ public class ProducerStatsDisabled implements ProducerStatsRecorder { public int getPendingQueueSize() { return 0; } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index 897193fff32..5a60dafb2b5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -27,7 +27,6 @@ import io.netty.util.TimerTask; import java.io.IOException; import java.text.DecimalFormat; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.DoubleAdder; import java.util.concurrent.atomic.LongAdder; import org.apache.pulsar.client.api.ProducerStats; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -51,8 +50,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { private final LongAdder totalBytesSent; private final LongAdder totalSendFailed; private final LongAdder totalAcksReceived; - private final DoubleAdder sendMsgsRateAggregate; - private final DoubleAdder sendBytesRateAggregate; private static final DecimalFormat DEC = new DecimalFormat("0.000"); private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00"); private final transient DoublesSketch ds; @@ -61,7 +58,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { private volatile double sendMsgsRate; private volatile double sendBytesRate; - private int partitions = 0; private volatile double[] latencyPctValues = new double[PERCENTILES.length]; private volatile double[] batchSizePctValues = new double[PERCENTILES.length]; private volatile double[] msgSizePctValues = new double[PERCENTILES.length]; @@ -77,8 +73,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent = new LongAdder(); totalSendFailed = new LongAdder(); totalAcksReceived = new LongAdder(); - sendMsgsRateAggregate = new DoubleAdder(); - sendBytesRateAggregate = new DoubleAdder(); ds = DoublesSketch.builder().build(256); batchSizeDs = DoublesSketch.builder().build(256); msgSizeDs = DoublesSketch.builder().build(256); @@ -97,8 +91,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent = new LongAdder(); totalSendFailed = new LongAdder(); totalAcksReceived = new LongAdder(); - sendMsgsRateAggregate = new DoubleAdder(); - sendBytesRateAggregate = new DoubleAdder(); ds = DoublesSketch.builder().build(256); batchSizeDs = DoublesSketch.builder().build(256); msgSizeDs = DoublesSketch.builder().build(256); @@ -247,7 +239,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent.reset(); totalSendFailed.reset(); totalAcksReceived.reset(); - partitions = 0; } void updateCumulativeStats(ProducerStats stats) { @@ -262,10 +253,6 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent.add(stats.getTotalBytesSent()); totalSendFailed.add(stats.getTotalSendFailed()); totalAcksReceived.add(stats.getTotalAcksReceived()); - // update rates - sendMsgsRateAggregate.add(stats.getSendMsgsRate()); - sendBytesRateAggregate.add(stats.getSendBytesRate()); - partitions++; } @Override @@ -306,12 +293,12 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { @Override public double getSendMsgsRate() { - return partitions != 0 ? sendMsgsRateAggregate.doubleValue() / partitions : sendMsgsRate; + return sendMsgsRate; } @Override public double getSendBytesRate() { - return partitions != 0 ? sendBytesRateAggregate.doubleValue() / partitions : sendBytesRate; + return sendBytesRate; } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index 7cac1bf14f3..e7bef63a2cf 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -80,10 +80,10 @@ public class ProducerStatsRecorderImplTest { @Test public void testPartitionTopicAggegationStats() { ProducerStatsRecorderImpl recorder1 = spy(new ProducerStatsRecorderImpl()); - ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl(); + PartitionedTopicProducerStatsRecorderImpl recorder2 = new PartitionedTopicProducerStatsRecorderImpl(); when(recorder1.getSendMsgsRate()).thenReturn(1000.0); when(recorder1.getSendBytesRate()).thenReturn(1000.0); - recorder2.updateCumulativeStats(recorder1); + recorder2.updateCumulativeStats("test", recorder1); assertTrue(recorder2.getSendBytesRate() > 0); assertTrue(recorder2.getSendMsgsRate() > 0); }
