This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-website.git
The following commit(s) were added to refs/heads/master by this push:
new 88b8c545fd6 Add doc metrics chap.3 (#2739)
88b8c545fd6 is described below
commit 88b8c545fd602ef9f13b70ca87ceb25723bf387b
Author: namelessssssssssss
<[email protected]>
AuthorDate: Wed Aug 9 17:16:26 2023 +0800
Add doc metrics chap.3 (#2739)
* add metrics docs
* add metrics docs
* update metrics docs
* update docs
* update headers
* Update 1-指标样本的收集与存储.md
* Update 2-指标收集器的指标采集流程.md
* Fix and update metrics source docs.
* Add metrics source docs chap.3
---------
Co-authored-by: songxiaosheng <[email protected]>
Co-authored-by: Albumen Kevin <[email protected]>
---
...263\250\345\206\214\346\242\263\347\220\206.md" | 413 ++++++++++++++++++++-
.../metris-event-dispatch-full.png | Bin 0 -> 71245 bytes
2 files changed, 412 insertions(+), 1 deletion(-)
diff --git
"a/content/zh-cn/blog/java/codeanalysis/metrics/3-\346\214\207\346\240\207\347\233\221\345\220\254\346\263\250\345\206\214\346\242\263\347\220\206.md"
"b/content/zh-cn/blog/java/codeanalysis/metrics/3-\346\214\207\346\240\207\347\233\221\345\220\254\346\263\250\345\206\214\346\242\263\347\220\206.md"
index 66fd894f241..57d658f9759 100644
---
"a/content/zh-cn/blog/java/codeanalysis/metrics/3-\346\214\207\346\240\207\347\233\221\345\220\254\346\263\250\345\206\214\346\242\263\347\220\206.md"
+++
"b/content/zh-cn/blog/java/codeanalysis/metrics/3-\346\214\207\346\240\207\347\233\221\345\220\254\346\263\250\345\206\214\346\242\263\347\220\206.md"
@@ -7,4 +7,415 @@ tags: ["源码解析", "Java"]
description: "Dubbo 指标模块源码分析-指标监听注册梳理"
---
-连载中,敬请期待...
\ No newline at end of file
+## 三、指标监听注册梳理
+
+在前一章中,我们了解了不同收集器中的指标样本是如何被监听器添加进去的。接下来,我们将归纳指标监听器 的创建位置,及它们对应统计的指标。
+
+通过之前的分析,我们已经知道指标
注册事件多播器(RegistryMetricsEventMulticaster)中定义了并绑定了服务注册相关的指标。这种绑定操作同样存在于其它几个简单指标事件多播器(SimpleMetricsEventMulticaster)的几个实现中。
+
+
+
+### 转发器注册
+
+
+
+**RegistrySubDispatcher (服务注册指标转发器)注册了服务注册相关指标:**
+
+* 应用级实例注册成功/失败/总数计数 (APPLICATION_REGISTER_...)
+* 应用级服务接口订阅成功/失败/总数计数 (APPLICATION_SUBSCRIBE_...)
+* 服务级注册成功/失败/总数计数 (SERVICE_REGISTER_...)
+* 特殊的 APPLICATION_NOTIFY_FINISH 和 APPLICATION_DIRECTORY_POST (应用服务目录变化次数)
+
+
+
+**MetadataSubDispatcher(元数据指标转发器)注册应用元数据相关指标**
+
+* 应用推送元数据相关计数 (APPLICATION_PUSH_...)
+
+* 应用订阅元数据相关计数 (APPLICAITON_SUBSCRIBE_...)
+
+* 服务订阅元数据相关计数 (SERVICE_SUBSCRIBE_...)
+
+
+
+**ConfigCenterSubDispatcher (配置中心指标转发器) 注册配置中心配置更新次数指标**
+
+* 配置中心推送新配置次数 (CONFIGCENTER_METRIC_TOTAL)
+
+
+
+**DefaultSubDispatcher (默认转发器) 注册核心RPC调用次数指标**
+
+* 请求次数 (METRIC_REQUESTS)
+* 请求成功次数(METRIC_REQUESTS_SUCCEED)
+* 请求失败次数(METRIC_REQUEST_BUSINESS_FAILED)
+
+
+
+**MetricsDispatcher**
+
+MetricsDispatcher 较为特殊,它负责 ApplicationModel 下所有 MetricsCollector(前文中提到的指标收集器)
的初始化注册工作,并将它们添加到自己的监听器列表中。
+
+```java
+public class MetricsDispatcher extends SimpleMetricsEventMulticaster {
+
+ @SuppressWarnings({"rawtypes"})
+ public MetricsDispatcher(ApplicationModel applicationModel) {
+ ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
+ ExtensionLoader<MetricsCollector> extensionLoader =
applicationModel.getExtensionLoader(MetricsCollector.class);
+ if (extensionLoader != null) {
+ List<MetricsCollector> customizeCollectors = extensionLoader
+ .getActivateExtensions();
+ for (MetricsCollector customizeCollector : customizeCollectors) {
+ beanFactory.registerBean(customizeCollector);
+ }
+ customizeCollectors.forEach(this::addListener);
+ }
+ }
+
+}
+```
+
+
+
+需要注意,以上几个实现均继承自
SimpleMetricsEventMulticaster,因此它们都具有注册监听、转发事件的能力。它们将自己注册到对应领域的指标 Collector
中,并在收到指标事件时转发到自己注册的监听器中。
+
+```java
+//SimpleMetricsEventMulticaster
+
+ public void addListener(MetricsListener<?> listener) {
+ listeners.add(listener);
+ }
+
+ public void publishEvent(MetricsEvent event) {
+ if (event instanceof EmptyEvent) {
+ return;
+ }
+ if (validateIfApplicationConfigExist(event)) return;
+ for (MetricsListener listener : listeners) {
+ if (listener.isSupport(event)) {
+ listener.onEvent(event);
+ }
+ }
+ }
+//...
+```
+
+
+
+**SubDispatcher 和 Collector 之间的对应关系:**
+
+* MetadataSubDispatcher -> MetadataMetricsCollector 元数据指标事件
+* RegistrySubDispatcher -> RegistryMetricsCollector 服务注册指标事件
+* ConfigCenterSubDispatcher -> ConfigCenterMetricsCollector 配置中心指标事件
+* MetricsDispatcher 由 MetricsEventBus 通过 BeanFactory 加载。它是所有事件转发的入口。
+
+
+
+### 事件触发
+
+剩下的问题就是这些监听器是如何被触发的。
+
+可以发现三大中心的指标转发器都是在它们对应的Collector中创建的:
+
+```java
+public ConfigCenterMetricsCollector(ApplicationModel applicationModel) {
+ ...
+ super.setEventMulticaster(new ConfigCenterMetricsDispatcher(this));
+ ...
+}
+```
+
+```java
+public MetadataMetricsCollector(ApplicationModel applicationModel) {
+ ...
+ super.setEventMulticaster(new MetadataMetricsEventMulticaster(this));
+ ...
+}
+```
+
+```java
+public RegistryMetricsCollector(ApplicationModel applicationModel) {
+ ...
+ super.setEventMulticaster(new RegistryMetricsEventMulticaster(this));
+ ...
+}
+```
+
+这意味这想要通过它们发布事件,需要通过它们对应的 `Collector` 来访问。
+
+
+
+如前文所述, MetricsDispatcher 在初始化时会尝试获取并加载所有 MetricsCollector 的SPI拓展,
+
+三大中心的MetricsCollector (Metadata/Registry/ConfigCenter)也会在这里被初始化,并添加为
MetricsDispatcher 的监听器:
+
+```java
+ public MetricsDispatcher(ApplicationModel applicationModel) {
+...
+ ExtensionLoader<MetricsCollector> extensionLoader =
applicationModel.getExtensionLoader(MetricsCollector.class);
+...
+ for (MetricsCollector customizeCollector : customizeCollectors) {
+ beanFactory.registerBean(customizeCollector);
+ }
+ customizeCollectors.forEach(this::addListener);
+ }
+```
+
+对于 MetricsDispatcher,它由 MetricsEventBus 创建。而 MetricsEventBus
自身作为指标相关消息的总线,会接收所有指标消息,并将它们转发给监听者。
+
+
+
+MetricsEvenetBus 提供了三个方法来发布指标事件:
+
+* `publish(MetricsEvent event)` ,将事件发布给所有订阅者,只发布一次且不关心事件处理结果
+* `post(MetricsEvent event, Supplier<T> targetSupplier)`
,将事件发布给所有订阅者,并根据是否产生异常判断事件成功或失败,调用 MetricsDispatcher 发布对应的事件。
+* `post(MetricsEvent event, Supplier<T> targetSupplier, Function<T, Boolean>
trFunction)` ,额外的 trFunction 可用于通过业务结果判断事件成功或失败。 `targetSupplier`
为业务操作函数,泛型T为业务结果类型。
+
+这三个方法均会通过 MetricsDispatcher 来转发事件。
+
+
+
+在之前的分析中,我们知道 MetricsDispatcher 创建了所有 MetricsCollector 拓展,并将它们注册为自己的监听者。
+
+因此,当 MetricsEventBus 接收到发布的信息时,它会将信息转发到所有 MetricsCollector 中。对于
CombMetricsCollector 的实现,它们又会调用自己创建的 MetricsEventMulticaster 再次转发消息,到具体指标的监听器。
+
+之后,这些监听器就会根据自己的逻辑修改Collector中的指标计数。
+
+
+
+
+
+
+
+
+
+### 事件发布
+
+接下来,我们将寻找指标事件发布的源头。
+
+通过前文的分析,我们知道 MetricsEventBus 是所有指标事件发布的入口。具体来说,它有以下的用法:
+
+* AbstractDirectory
+* ServiceConfig
+* DefaultApplicationDeployer
+* ApolloDynamicConfiguration
+* NacosDynamicConfiguration
+* ZookeeperDataListener
+* AbstractMetadataReport
+
+我们将逐个分析每个用法。
+
+
+
+**AbstractDirectory**
+
+AbstractDirectory 在修改 Invoker 状态相关的操作完成后都会通过 MetricsEventBus 发布
refreshDirectoryEvent(服务目录更新事件,类型为 RegistryEvent ),将当前目录中各种状态 Invoker
实例的最新数量作为附件添加到 RegistryEvent 中。
+
+```java
+//AbstractDirectory
+
+ public void recoverDisabledInvoker(Invoker<T> invoker) {
+ ...
+
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary()));
+ }
+
+
+ protected void setInvokers(BitList<Invoker<T>> invokers) {
+ ...
+
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary()));
+ }
+
+ protected void setInvokers(BitList<Invoker<T>> invokers) {
+ ...
+
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary()));
+ }
+
+ private Map<MetricsKey, Map<String, Integer>> getSummary() {
+ Map<MetricsKey, Map<String, Integer>> summaryMap = new HashMap<>();
+ //目录中可用的Invoker数量
+ summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_VALID,
groupByServiceKey(getValidInvokers()));
+ //目录中不可用的Invoker数量
+ summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_DISABLE,
groupByServiceKey(getDisabledInvokers()));
+ //目录中等待重连的Invoker数量
+ summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_TO_RECONNECT,
groupByServiceKey(getInvokersToReconnect()));
+ summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_ALL,
groupByServiceKey(getInvokers()));
+ return summaryMap;
+ }
+...
+```
+
+该事件最终会由 RegistryMetricsCollector 中的 RegistryMetricsDispatcher
转发到关系该事件的监听器中。**事件和监听器之间通过 MetricsKey匹配** 。
+
+最终,MetricsKey 为 `**DIRECTORY_METRIC_NUM_VALID**` 的监听器会处理这个事件,并更新 Collector中的计数。
+
+```java
+//DIRECTORY_METRIC_NUM_VALID 对应的 Listener。
+MetricsCat APPLICATION_DIRECTORY_POST = new
MetricsCat(MetricsKey.DIRECTORY_METRIC_NUM_VALID, (key, placeType, collector)
-> AbstractMetricsListener.onEvent(key,
+ event ->
+ {
+ Map<MetricsKey, Map<String, Integer>> summaryMap =
event.getAttachmentValue(ATTACHMENT_DIRECTORY_MAP);
+ summaryMap.forEach((metricsKey, map) ->
+ map.forEach(
+ (k, v) -> collector.setNum(metricsKey,
event.appName(), k, v)));
+ }
+ ));
+```
+
+这样,服务目录中不同状态 Invoker 的计数就通过 RegistryMetricsCollector 更新到了 ServiceStatComposite
中。
+
+
+
+**ServiceConfig**
+
+当通过 ServiceConfig 导出、注册一个服务时,它会发布一个服务导出事件。
+
+```java
+//ServiceConfig
+ protected synchronized void doExport() {
+ ...
+ doExportUrls();
+ exported();
+ }
+```
+
+```java
+private void doExportUrls() {
+ ...
+
MetricsEventBus.post(RegistryEvent.toRsEvent(module.getApplicationModel(),
getUniqueServiceName(), protocols.size() * registryURLs.size()),
+ //该函数会被同步执行,如果抛出异常则触发 MetricsEvent 的 onError方法,否则触发 onFinish
+ () -> {
+ for (ProtocolConfig protocolConfig : protocols) {
+ String pathKey =
URL.buildKey(getContextPath(protocolConfig)
+ .map(p -> p + "/" + path)
+ .orElse(path), group, version);
+ if (!serverService) {
+ repository.registerService(pathKey, interfaceClass);
+ }
+ doExportUrlsFor1Protocol(protocolConfig, registryURLs);
+ }
+ return null;
+ }
+ );
+ providerModel.setServiceUrls(urls);
+ }
+```
+
+事件发布时,该事件会被转发到 RegistryMetricsCollector,触发对应的 Listener 增加
**SERVICE_REGISTER_METRIC_REQUESTS** (当前服务级注册请求总数)的计数,然后执行定义的 provider
函数。根据是否抛出异常,之后执行 onError 方法 或 onFinish 方法,增加
**SERVICE_REGISTER_METRIC_REQUESTS_FAILED**
(当前服务级注册请求失败总数)或**SERVICE_REGISTER_METRIC_REQUESTS_SUCCEED** (当前服务级注册请求成功总数)
的计数。
+
+
+
+**DefaultApplicationDeployer**
+
+它在应用部署过程中,初始化配置中心时,发布配置发生改变的事件。
+
+```java
+ private void startConfigCenter() {
+ ...
+ compositeDynamicConfiguration.addConfiguration(
+ prepareEnvironment(configCenter)
+ );
+ ...
+ }
+```
+
+```java
+ private DynamicConfiguration prepareEnvironment(ConfigCenterConfig
configCenter) {
+...
+ // Add metrics
+
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel,
configCenter.getConfigFile(),
configCenter.getGroup(),configCenter.getProtocol(),
ConfigChangeType.ADDED.name(), configMap.size()));
+ if (isNotEmpty(appGroup)) {
+
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel,
appConfigFile, appGroup, configCenter.getProtocol(),
ConfigChangeType.ADDED.name(), appConfigMap.size()));
+ }
+ ...
+
+ }
+```
+
+在 `prepareEnvironment` 方法中,会按配置中心设置的组(group)和当前应用程序的名称作为组名发布两次事件。该事件会被转发到
ConfigCenterMetricsCollector,增加 **CONFIGCENTER_METRIC_TOTAL** (配置中心配置变化次数)的计数。
+
+
+
+**ApolloDynamicConfiguration**
+
+动态配置功能的 Apollo 配置中心实现。
+
+```java
+//ApolloDynamicConfiguration
+@Override
+public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent
changeEvent) {
+ ...
+
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel,
event.getKey(), event.getGroup(),ConfigCenterEvent.APOLLO_PROTOCOL,
ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
+ }
+```
+
+当Apollo配置中心的配置发生变化时,它的 `onChange` 方法会被触发,并在最后发布一个
ConfigCenterEvent。该事件最终转发到ConfigCenterMetricsCollector 中,同样增加
**CONFIGCENTER_METRIC_TOTAL **的计数。
+
+
+
+**NacosDynamicConfiguration**
+
+动态配置功能的 Nacos 配置中心实现。
+
+```java
+//NacosDynamicConfiguration
+@Override
+ public void innerReceive(String dataId, String group, String
configInfo) {
+ ...
+
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel,
event.getKey(), event.getGroup(),
+ ConfigCenterEvent.NACOS_PROTOCOL,
ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
+ }
+```
+
+当Nacos配置中心的配置发生变化时,它的 `innerReceive` 方法被触发,发布一个 ConfigCenterEvent。它的处理流程和
ApolloDynamicConfiguration 一致,最终增加 **CONFIGCENTER_METRIC_TOTAL **的计数。
+
+
+
+**ZookeeperDataListener**
+
+动态配置功能的 Zookeeper 实现。
+
+```java
+//ZookeeperDataListener
+@Override
+ public void dataChanged(String path, Object value, EventType eventType) {
+ ...
+
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel,
configChangeEvent.getKey(), configChangeEvent.getGroup(),
+ ConfigCenterEvent.ZK_PROTOCOL, ConfigChangeType.ADDED.name(),
SELF_INCREMENT_SIZE));
+ }
+```
+
+在指标收集层面,它的行为和前文中两个配置中心一致,此处不详细展开三个配置中心具体实现的异同。
+
+
+
+**AbstractMetadataReport**
+
+元数据报告接口的抽象实现。它的三个实现
(NacosMetadataReport、RedisMetadataReport、ZookeeperMetadataReport)均使用它的指标事件逻辑。
+
+当订阅新服务并获取它的元数据时,它会发布一个 MetadataEvent 触发相关指标的修改。
+
+```java
+private void storeProviderMetadataTask(MetadataIdentifier
providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
+...
+ MetricsEventBus.post(metadataEvent, () ->
+ {
+ boolean result = true;
+ try {
+ ...
+ doStoreProviderMetadata(providerMetadataIdentifier, data);
+ saveProperties(providerMetadataIdentifier, data, true,
!syncReport);
+ ...
+ } catch (Exception e) {
+ ...
+ result = false;
+ }
+ return result;
+ }, aBoolean -> aBoolean
+ );
+ }
+```
+
+在前文中,我们提到 MetricsEventBus.post 的第二个参数是实际要进行的业务操作,第三个参数则是根据业务操作返回值判断操作是否成功的逻辑。
+
+此处的业务操作是尝试存储目标服务的元数据。执行操作之前,会先发布事件,最终增加 **STORE_PROVIDER_METADATA**
(尝试存储服务元数据次数)的计数。如果产生异常,会增加 **STORE_PROVIDER_METADATA_ERROR** (存储服务元数据失败)
的计数,否则增加**STORE_PROVIDER_METADATA_SUCCEED**(存储服务元数据成功) 的计数。
\ No newline at end of file
diff --git
a/static/imgs/blog/metrics-source-blog/metris-event-dispatch-full.png
b/static/imgs/blog/metrics-source-blog/metris-event-dispatch-full.png
new file mode 100644
index 00000000000..ec8a0b0aeaa
Binary files /dev/null and
b/static/imgs/blog/metrics-source-blog/metris-event-dispatch-full.png differ