This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eefb5a8 [Issue 3806] Fix NPE while call
PartitionedProducerImpl.getStats() (#3829)
eefb5a8 is described below
commit eefb5a8352a54ce9f0e6f93fba8f05d57f4077eb
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Mar 20 01:37:57 2019 +0800
[Issue 3806] Fix NPE while call PartitionedProducerImpl.getStats() (#3829)
* Fix NPE while call PartitionedProducerImpl.getStats().
* Add unit tests and fix NPE while call MultiTopicsConsumerImpl.getStats()
---
.../client/impl/ConsumerStatsRecorderImpl.java | 20 +++----
.../client/impl/ProducerStatsRecorderImpl.java | 18 +++---
.../client/impl/MultiTopicsConsumerImplTest.java | 65 ++++++++++++++++++++++
.../client/impl/PartitionedProducerImplTest.java | 37 +++++++++++-
4 files changed, 120 insertions(+), 20 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index e40d7ce..ae6bbe4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -63,16 +63,16 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
public ConsumerStatsRecorderImpl() {
- numMsgsReceived = null;
- numBytesReceived = null;
- numReceiveFailed = null;
- numAcksSent = null;
- numAcksFailed = null;
- totalMsgsReceived = null;
- totalBytesReceived = null;
- totalReceiveFailed = null;
- totalAcksSent = null;
- totalAcksFailed = null;
+ numMsgsReceived = new LongAdder();
+ numBytesReceived = new LongAdder();
+ numReceiveFailed = new LongAdder();
+ numAcksSent = new LongAdder();
+ numAcksFailed = new LongAdder();
+ totalMsgsReceived = new LongAdder();
+ totalBytesReceived = new LongAdder();
+ totalReceiveFailed = new LongAdder();
+ totalAcksSent = new LongAdder();
+ totalAcksFailed = new LongAdder();
}
public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ConsumerConfigurationData<?> conf,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index cb60a52..15e8f53 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -64,15 +64,15 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99,
0.999, 1.0 };
public ProducerStatsRecorderImpl() {
- numMsgsSent = null;
- numBytesSent = null;
- numSendFailed = null;
- numAcksReceived = null;
- totalMsgsSent = null;
- totalBytesSent = null;
- totalSendFailed = null;
- totalAcksReceived = null;
- ds = null;
+ numMsgsSent = new LongAdder();
+ numBytesSent = new LongAdder();
+ numSendFailed = new LongAdder();
+ numAcksReceived = new LongAdder();
+ totalMsgsSent = new LongAdder();
+ totalBytesSent = new LongAdder();
+ totalSendFailed = new LongAdder();
+ totalAcksReceived = new LongAdder();
+ ds = DoublesSketch.builder().build(256);
}
public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ProducerConfigurationData conf,
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
new file mode 100644
index 0000000..816dc85
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit Tests of {@link MultiTopicsConsumerImpl}.
+ */
+public class MultiTopicsConsumerImplTest {
+
+ @Test
+ public void testGetStats() throws Exception {
+ String topicName = "test-stats";
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ conf.setStatsIntervalSeconds(100);
+
+ ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+ ExecutorService listenerExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+ PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
+
+ ConsumerConfigurationData consumerConfData = new
ConsumerConfigurationData();
+ consumerConfData.setTopicNames(Sets.newHashSet(topicName));
+
+ assertEquals(Long.parseLong("100"),
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+ MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
+ clientImpl, consumerConfData,
+ listenerExecutor, null, null, null);
+
+ impl.getStats();
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index e28e471..65e8df3 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,19 +18,29 @@
*/
package org.apache.pulsar.client.impl;
+import io.netty.channel.EventLoopGroup;
import io.netty.util.Timer;
-import org.apache.pulsar.client.api.*;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -111,4 +121,29 @@ public class PartitionedProducerImplTest {
}
}
+ @Test
+ public void testGetStats() throws Exception {
+ String topicName = "test-stats";
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ conf.setStatsIntervalSeconds(100);
+
+ ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+ PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
+
+ ProducerConfigurationData producerConfData = new
ProducerConfigurationData();
+
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ producerConfData.setCustomMessageRouter(new CustomMessageRouter());
+
+ assertEquals(Long.parseLong("100"),
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+ PartitionedProducerImpl impl = new PartitionedProducerImpl(
+ clientImpl, topicName, producerConfData,
+ 1, null, null, null);
+
+ impl.getStats();
+ }
+
}