This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eefb5a8  [Issue 3806] Fix NPE while call 
PartitionedProducerImpl.getStats() (#3829)
eefb5a8 is described below

commit eefb5a8352a54ce9f0e6f93fba8f05d57f4077eb
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Mar 20 01:37:57 2019 +0800

    [Issue 3806] Fix NPE while call PartitionedProducerImpl.getStats() (#3829)
    
    * Fix NPE while call PartitionedProducerImpl.getStats().
    
    * Add unit tests and fix NPE while call MultiTopicsConsumerImpl.getStats()
---
 .../client/impl/ConsumerStatsRecorderImpl.java     | 20 +++----
 .../client/impl/ProducerStatsRecorderImpl.java     | 18 +++---
 .../client/impl/MultiTopicsConsumerImplTest.java   | 65 ++++++++++++++++++++++
 .../client/impl/PartitionedProducerImplTest.java   | 37 +++++++++++-
 4 files changed, 120 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index e40d7ce..ae6bbe4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -63,16 +63,16 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
 
     public ConsumerStatsRecorderImpl() {
-        numMsgsReceived = null;
-        numBytesReceived = null;
-        numReceiveFailed = null;
-        numAcksSent = null;
-        numAcksFailed = null;
-        totalMsgsReceived = null;
-        totalBytesReceived = null;
-        totalReceiveFailed = null;
-        totalAcksSent = null;
-        totalAcksFailed = null;
+        numMsgsReceived = new LongAdder();
+        numBytesReceived = new LongAdder();
+        numReceiveFailed = new LongAdder();
+        numAcksSent = new LongAdder();
+        numAcksFailed = new LongAdder();
+        totalMsgsReceived = new LongAdder();
+        totalBytesReceived = new LongAdder();
+        totalReceiveFailed = new LongAdder();
+        totalAcksSent = new LongAdder();
+        totalAcksFailed = new LongAdder();
     }
 
     public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData<?> conf,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index cb60a52..15e8f53 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -64,15 +64,15 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 
0.999, 1.0 };
 
     public ProducerStatsRecorderImpl() {
-        numMsgsSent = null;
-        numBytesSent = null;
-        numSendFailed = null;
-        numAcksReceived = null;
-        totalMsgsSent = null;
-        totalBytesSent = null;
-        totalSendFailed = null;
-        totalAcksReceived = null;
-        ds = null;
+        numMsgsSent = new LongAdder();
+        numBytesSent = new LongAdder();
+        numSendFailed = new LongAdder();
+        numAcksReceived = new LongAdder();
+        totalMsgsSent = new LongAdder();
+        totalBytesSent = new LongAdder();
+        totalSendFailed = new LongAdder();
+        totalAcksReceived = new LongAdder();
+        ds = DoublesSketch.builder().build(256);
     }
 
     public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ProducerConfigurationData conf,
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
new file mode 100644
index 0000000..816dc85
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit Tests of {@link MultiTopicsConsumerImpl}.
+ */
+public class MultiTopicsConsumerImplTest {
+
+    @Test
+    public void testGetStats() throws Exception {
+        String topicName = "test-stats";
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        conf.setStatsIntervalSeconds(100);
+
+        ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+        ExecutorService listenerExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
+
+        ConsumerConfigurationData consumerConfData = new 
ConsumerConfigurationData();
+        consumerConfData.setTopicNames(Sets.newHashSet(topicName));
+
+        assertEquals(Long.parseLong("100"), 
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+        MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
+            clientImpl, consumerConfData,
+            listenerExecutor, null, null, null);
+
+        impl.getStats();
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index e28e471..65e8df3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,19 +18,29 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.channel.EventLoopGroup;
 import io.netty.util.Timer;
 
-import org.apache.pulsar.client.api.*;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -111,4 +121,29 @@ public class PartitionedProducerImplTest {
         }
     }
 
+    @Test
+    public void testGetStats() throws Exception {
+        String topicName = "test-stats";
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        conf.setStatsIntervalSeconds(100);
+
+        ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
+
+        ProducerConfigurationData producerConfData = new 
ProducerConfigurationData();
+        
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+        producerConfData.setCustomMessageRouter(new CustomMessageRouter());
+
+        assertEquals(Long.parseLong("100"), 
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+        PartitionedProducerImpl impl = new PartitionedProducerImpl(
+            clientImpl, topicName, producerConfData,
+            1, null, null, null);
+
+        impl.getStats();
+    }
+
 }

Reply via email to