This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit 5b3878752627702355f62012b2d6a9e97a2e2111
Author: wujimin <[email protected]>
AuthorDate: Mon Nov 12 16:51:58 2018 +0800

    [SCB-1021] add vertx client/server meters
---
 .../core/transport/AbstractTransport.java          |  8 +-
 .../vertx/metrics/DefaultVertxMetrics.java         | 17 +---
 .../metric/DefaultClientEndpointMetricManager.java | 13 ++-
 .../metrics/core/VertxMetersInitializer.java       | 61 ++++++++++++++
 .../metrics/core/meter/vertx/EndpointMeter.java    | 96 ++++++++++++++++++++++
 .../core/meter/vertx/VertxEndpointsMeter.java      | 79 ++++++++++++++++++
 ...rvicecomb.foundation.metrics.MetricsInitializer |  3 +-
 7 files changed, 257 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
 
b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
index 519fc1d..2d6ab98 100644
--- 
a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
+++ 
b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
@@ -59,9 +59,13 @@ public abstract class AbstractTransport implements Transport 
{
   private static final long DEFAULT_TIMEOUT_MILLIS = 30000;
 
   // 所有transport使用同一个vertx实例,避免创建太多的线程
-  public static TransportVertxFactory transportVertxFactory = new 
TransportVertxFactory();
+  private static TransportVertxFactory transportVertxFactory = new 
TransportVertxFactory();
 
-  protected Vertx transportVertx = transportVertxFactory.getTransportVertx();
+  public static TransportVertxFactory getTransportVertxFactory() {
+    return transportVertxFactory;
+  }
+
+  protected Vertx transportVertx = 
getTransportVertxFactory().getTransportVertx();
 
   protected Endpoint endpoint;
 
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java
index e68e418..41cec77 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java
@@ -49,8 +49,8 @@ public class DefaultVertxMetrics extends DummyVertxMetrics {
   public DefaultVertxMetrics(Vertx vertx, VertxOptions vertxOptions) {
     this.vertx = vertx;
     this.vertxOptions = vertxOptions;
-    // can not create DefaultClientEndpointMetricManager in this time
-    // because vertx is not inited
+    this.clientEndpointMetricManager = new 
DefaultClientEndpointMetricManager(vertx,
+        (MetricsOptionsEx) vertxOptions.getMetricsOptions());
   }
 
   public Vertx getVertx() {
@@ -65,17 +65,6 @@ public class DefaultVertxMetrics extends DummyVertxMetrics {
     return serverEndpointMetricMap;
   }
 
-  private void initClientEndpointMetricManager() {
-    if (clientEndpointMetricManager == null) {
-      synchronized (vertx) {
-        if (clientEndpointMetricManager == null) {
-          clientEndpointMetricManager = new 
DefaultClientEndpointMetricManager(vertx,
-              (MetricsOptionsEx) vertxOptions.getMetricsOptions());
-        }
-      }
-    }
-  }
-
   @Override
   public HttpServerMetrics<?, ?, ?> createMetrics(HttpServer server, 
SocketAddress localAddress,
       HttpServerOptions options) {
@@ -86,7 +75,6 @@ public class DefaultVertxMetrics extends DummyVertxMetrics {
 
   @Override
   public HttpClientMetrics<?, ?, ?, ?, ?> createMetrics(HttpClient client, 
HttpClientOptions options) {
-    initClientEndpointMetricManager();
     return new DefaultHttpClientMetrics(clientEndpointMetricManager);
   }
 
@@ -99,7 +87,6 @@ public class DefaultVertxMetrics extends DummyVertxMetrics {
 
   @Override
   public TCPMetrics<?> createMetrics(NetClientOptions options) {
-    initClientEndpointMetricManager();
     return new DefaultTcpClientMetrics(clientEndpointMetricManager);
   }
 
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java
index c014d57..aa25d13 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java
@@ -17,6 +17,7 @@
 package org.apache.servicecomb.foundation.vertx.metrics.metric;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -29,6 +30,8 @@ import io.vertx.core.Vertx;
 import io.vertx.core.net.SocketAddress;
 
 public class DefaultClientEndpointMetricManager {
+  private final Vertx vertx;
+
   private final MetricsOptionsEx metricsOptionsEx;
 
   // to avoid save too many endpoint that not exist any more
@@ -37,10 +40,11 @@ public class DefaultClientEndpointMetricManager {
 
   private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
 
+  private AtomicBoolean inited = new AtomicBoolean(false);
+
   public DefaultClientEndpointMetricManager(Vertx vertx, MetricsOptionsEx 
metricsOptionsEx) {
+    this.vertx = vertx;
     this.metricsOptionsEx = metricsOptionsEx;
-    
vertx.setPeriodic(metricsOptionsEx.getCheckClientEndpointMetricIntervalInMilliseconds(),
-        this::onCheckClientEndpointMetricExpired);
   }
 
   @VisibleForTesting
@@ -53,6 +57,11 @@ public class DefaultClientEndpointMetricManager {
   }
 
   public DefaultClientEndpointMetric onConnect(SocketAddress serverAddress) {
+    if (inited.compareAndSet(false, true)) {
+      
vertx.setPeriodic(metricsOptionsEx.getCheckClientEndpointMetricIntervalInMilliseconds(),
+          this::onCheckClientEndpointMetricExpired);
+    }
+
     rwlock.readLock().lock();
     try {
       DefaultClientEndpointMetric clientEndpointMetric = 
clientEndpointMetricMap
diff --git 
a/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/VertxMetersInitializer.java
 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/VertxMetersInitializer.java
new file mode 100644
index 0000000..bd3c567
--- /dev/null
+++ 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/VertxMetersInitializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.metrics.core;
+
+import org.apache.servicecomb.core.transport.AbstractTransport;
+import org.apache.servicecomb.foundation.metrics.MetricsBootstrapConfig;
+import org.apache.servicecomb.foundation.metrics.MetricsInitializer;
+import org.apache.servicecomb.foundation.metrics.registry.GlobalRegistry;
+import org.apache.servicecomb.metrics.core.meter.vertx.VertxEndpointsMeter;
+
+import com.google.common.eventbus.EventBus;
+import com.netflix.spectator.api.Id;
+import com.netflix.spectator.api.Registry;
+import com.netflix.spectator.api.SpectatorUtils;
+
+public class VertxMetersInitializer implements MetricsInitializer {
+  public static final String VERTX_ENDPOINTS = "servicecomb.vertx.endpoints";
+
+  public static final String ENDPOINTS_TYPE = "type";
+
+  public static final String ENDPOINTS_CLINET = "client";
+
+  public static final String ENDPOINTS_SERVER = "server";
+
+  @Override
+  public void init(GlobalRegistry globalRegistry, EventBus eventBus, 
MetricsBootstrapConfig config) {
+    Registry registry = globalRegistry.getDefaultRegistry();
+
+    Id endpointsId = registry.createId(VERTX_ENDPOINTS);
+    VertxEndpointsMeter clientMeter = new 
VertxEndpointsMeter(endpointsId.withTag(ENDPOINTS_TYPE, ENDPOINTS_CLINET),
+        AbstractTransport
+            .getTransportVertxFactory()
+            .getMetricsFactory()
+            .getVertxMetrics()
+            .getClientEndpointMetricManager()
+            .getClientEndpointMetricMap());
+    SpectatorUtils.registerMeter(registry, clientMeter);
+
+    VertxEndpointsMeter serverMeter = new 
VertxEndpointsMeter(endpointsId.withTag(ENDPOINTS_TYPE, ENDPOINTS_SERVER),
+        AbstractTransport
+            .getTransportVertxFactory()
+            .getMetricsFactory()
+            .getVertxMetrics()
+            .getServerEndpointMetricMap());
+    SpectatorUtils.registerMeter(registry, serverMeter);
+  }
+}
diff --git 
a/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/EndpointMeter.java
 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/EndpointMeter.java
new file mode 100644
index 0000000..e3e7cf5
--- /dev/null
+++ 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/EndpointMeter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.metrics.core.meter.vertx;
+
+import java.util.List;
+
+import 
org.apache.servicecomb.foundation.vertx.metrics.metric.DefaultEndpointMetric;
+
+import com.netflix.spectator.api.Id;
+import com.netflix.spectator.api.Measurement;
+
+public class EndpointMeter {
+  public static final String ADDRESS = "address";
+
+  public static final String STATISTIC = "statistic";
+
+  public static final String CONNECT_COUNT = "connectCount";
+
+  public static final String DISCONNECT_COUNT = "disconnectCount";
+
+  public static final String CONNECTIONS = "connections";
+
+  public static final String BYTES_READ = "bytesRead";
+
+  public static final String BYTES_WRITTEN = "bytesWritten";
+
+  private Id idConnect;
+
+  private Id idDisconnect;
+
+  private Id idConnections;
+
+  private Id idBytesRead;
+
+  private Id idBytesWritten;
+
+  private DefaultEndpointMetric metric;
+
+  private long lastConnectCount;
+
+  private long lastDisconnectCount;
+
+  private long lastBytesRead;
+
+  private long lastBytesWritten;
+
+  public EndpointMeter(Id id, DefaultEndpointMetric metric) {
+    id = id.withTag(ADDRESS, metric.getAddress().toString());
+    idConnect = id.withTag(STATISTIC, CONNECT_COUNT);
+    idDisconnect = id.withTag(STATISTIC, DISCONNECT_COUNT);
+    idConnections = id.withTag(STATISTIC, CONNECTIONS);
+    idBytesRead = id.withTag(STATISTIC, BYTES_READ);
+    idBytesWritten = id.withTag(STATISTIC, BYTES_WRITTEN);
+    this.metric = metric;
+  }
+
+  public DefaultEndpointMetric getMetric() {
+    return metric;
+  }
+
+  private Measurement newMeasurement(Id id, long timestamp, Number n) {
+    return new Measurement(id, timestamp, n.doubleValue());
+  }
+
+  public void calcMeasurements(List<Measurement> measurements, long msNow, 
double secondInterval) {
+    long connectCount = metric.getConnectCount();
+    long disconnectCount = metric.getDisconnectCount();
+    long bytesRead = metric.getBytesRead();
+    long bytesWritten = metric.getBytesWritten();
+
+    measurements.add(newMeasurement(idConnect, msNow, connectCount - 
lastConnectCount));
+    measurements.add(newMeasurement(idDisconnect, msNow, disconnectCount - 
lastDisconnectCount));
+    measurements.add(newMeasurement(idConnections, msNow, connectCount - 
disconnectCount));
+    measurements.add(newMeasurement(idBytesRead, msNow, (bytesRead - 
lastBytesRead) / secondInterval));
+    measurements.add(newMeasurement(idBytesWritten, msNow, (bytesWritten - 
lastBytesWritten) / secondInterval));
+
+    this.lastConnectCount = connectCount;
+    this.lastDisconnectCount = disconnectCount;
+    this.lastBytesRead = bytesRead;
+    this.lastBytesWritten = bytesWritten;
+  }
+}
diff --git 
a/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/VertxEndpointsMeter.java
 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/VertxEndpointsMeter.java
new file mode 100644
index 0000000..44014f8
--- /dev/null
+++ 
b/metrics/metrics-core/src/main/java/org/apache/servicecomb/metrics/core/meter/vertx/VertxEndpointsMeter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.metrics.core.meter.vertx;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.foundation.metrics.meter.AbstractPeriodMeter;
+import 
org.apache.servicecomb.foundation.vertx.metrics.metric.DefaultEndpointMetric;
+
+import com.netflix.spectator.api.Id;
+import com.netflix.spectator.api.Measurement;
+
+import io.vertx.core.net.SocketAddress;
+
+public class VertxEndpointsMeter extends AbstractPeriodMeter {
+  private Map<SocketAddress, DefaultEndpointMetric> endpointMetricMap;
+
+  private Map<SocketAddress, EndpointMeter> endpointMeterMap = new 
ConcurrentHashMapEx<>();
+
+  @SuppressWarnings("unchecked")
+  public <T extends DefaultEndpointMetric> VertxEndpointsMeter(Id id, 
Map<SocketAddress, T> endpointMetricMap) {
+    this.id = id;
+    this.endpointMetricMap = (Map<SocketAddress, DefaultEndpointMetric>) 
endpointMetricMap;
+  }
+
+  @Override
+  public void calcMeasurements(long msNow, long secondInterval) {
+    List<Measurement> measurements = new ArrayList<>();
+    calcMeasurements(measurements, msNow, secondInterval);
+    allMeasurements = measurements;
+  }
+
+  @Override
+  public void calcMeasurements(List<Measurement> measurements, long msNow, 
long secondInterval) {
+    syncMeters();
+
+    for (EndpointMeter meter : endpointMeterMap.values()) {
+      meter.calcMeasurements(measurements, msNow, secondInterval);
+    }
+  }
+
+  private void syncMeters() {
+    for (EndpointMeter meter : endpointMeterMap.values()) {
+      if (!endpointMetricMap.containsKey(meter.getMetric().getAddress())) {
+        endpointMeterMap.remove(meter.getMetric().getAddress());
+      }
+    }
+    for (DefaultEndpointMetric metric : endpointMetricMap.values()) {
+      endpointMeterMap.computeIfAbsent(metric.getAddress(), addr -> new 
EndpointMeter(id, metric));
+    }
+  }
+
+  @Override
+  public Iterable<Measurement> measure() {
+    return allMeasurements;
+  }
+
+  @Override
+  public boolean hasExpired() {
+    return false;
+  }
+}
diff --git 
a/metrics/metrics-core/src/main/resources/META-INF/services/org.apache.servicecomb.foundation.metrics.MetricsInitializer
 
b/metrics/metrics-core/src/main/resources/META-INF/services/org.apache.servicecomb.foundation.metrics.MetricsInitializer
index 3fed538..a8d82d0 100644
--- 
a/metrics/metrics-core/src/main/resources/META-INF/services/org.apache.servicecomb.foundation.metrics.MetricsInitializer
+++ 
b/metrics/metrics-core/src/main/resources/META-INF/services/org.apache.servicecomb.foundation.metrics.MetricsInitializer
@@ -19,5 +19,6 @@ org.apache.servicecomb.metrics.core.DefaultRegistryInitializer
 org.apache.servicecomb.metrics.core.InvocationMetersInitializer
 org.apache.servicecomb.metrics.core.ThreadPoolMetersInitializer
 org.apache.servicecomb.metrics.core.publish.DefaultLogPublisher
+org.apache.servicecomb.metrics.core.VertxMetersInitializer
 org.apache.servicecomb.metrics.core.OsMetersInitializer
-org.apache.servicecomb.metrics.core.publish.MetricsRestPublisher
\ No newline at end of file
+org.apache.servicecomb.metrics.core.publish.MetricsRestPublisher

Reply via email to