Pil0tXia commented on code in PR #3446:
URL: https://github.com/apache/eventmesh/pull/3446#discussion_r1450991643


##########
eventmesh-common/src/main/java/org/apache/eventmesh/common/MetricsConstants.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common;
+
+
+public abstract class MetricsConstants {
+
+    private MetricsConstants() {
+
+    }

Review Comment:
   Maybe decorating this constant with `class` is enough, instead of `abstract 
class`.
   
   The usages of this constant are all in metrics-plugin module, so perhaps 
placing this class in metrics-plugin module will be better.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java:
##########
@@ -68,37 +67,44 @@ public HeartBeatProcessor(final EventMeshHTTPServer 
eventMeshHTTPServer) {
     public void processRequest(final ChannelHandlerContext ctx, final 
AsyncContext<HttpCommand> asyncContext) throws Exception {
         HttpCommand responseEventMeshCommand;
         final String localAddress = IPUtils.getLocalAddress();
-        HttpCommand request = asyncContext.getRequest();
+
         if (log.isInfoEnabled()) {
             log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
-                RequestCode.get(Integer.valueOf(request.getRequestCode())),
+                
RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
                 EventMeshConstants.PROTOCOL_HTTP,
                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
localAddress);
         }
-        final HeartbeatRequestHeader heartbeatRequestHeader = 
(HeartbeatRequestHeader) request.getHeader();
-        final HeartbeatRequestBody heartbeatRequestBody = 
(HeartbeatRequestBody) request.getBody();
-        EventMeshHTTPConfiguration httpConfiguration = 
eventMeshHTTPServer.getEventMeshHttpConfiguration();
+        final HeartbeatRequestHeader heartbeatRequestHeader = 
(HeartbeatRequestHeader) asyncContext.getRequest().getHeader();
+        final HeartbeatRequestBody heartbeatRequestBody = 
(HeartbeatRequestBody) asyncContext.getRequest().getBody();
         final HeartbeatResponseHeader heartbeatResponseHeader =
-            
HeartbeatResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
-                    httpConfiguration.getEventMeshCluster(),
-                    localAddress, httpConfiguration.getEventMeshEnv(),
-                    httpConfiguration.getEventMeshIDC());
+            
HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+                
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(),
+                localAddress, 
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(),
+                
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
 
         //validate header
-
-        if (StringUtils.isAnyBlank(
-                heartbeatRequestHeader.getIdc(), 
heartbeatRequestHeader.getPid(), heartbeatRequestHeader.getSys())
-            || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())) {
-            completeResponse(request, asyncContext, heartbeatResponseHeader,
-                    EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, 
HeartbeatResponseBody.class);
+        if (StringUtils.isBlank(heartbeatRequestHeader.getIdc())
+            || StringUtils.isBlank(heartbeatRequestHeader.getPid())
+            || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())
+            || StringUtils.isBlank(heartbeatRequestHeader.getSys())) {
+            responseEventMeshCommand = 
asyncContext.getRequest().createHttpCommandResponse(
+                heartbeatResponseHeader,
+                
HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+                    
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+            asyncContext.onComplete(responseEventMeshCommand);

Review Comment:
   It seems that the main change here is the invocation of 
`createHttpCommandResponse`. What are the advantages of the validation logic 
here compared to the original? Bringing a more comprehensive validation logic?



##########
eventmesh-common/src/main/java/org/apache/eventmesh/common/Pair.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common;
+
+public class Pair<Left, Right> {
+
+    private Left left;

Review Comment:
   We have two `Pair` classes in our project:
   
   - org.apache.eventmesh.common.Pair
   - org.apache.eventmesh.runtime.common.Pair
   
   The usages of the latter one can be unified to the former one.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/MetricsManager.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics;
+
+import org.apache.eventmesh.metrics.api.model.Metric;
+
+import java.util.List;
+
+/**
+ * metric manager interface
+ */
+
+/**
+ * MetricsManager is an interface for managing metrics.
+ */
+public interface MetricsManager {

Review Comment:
   duplicate javadoc



##########
eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/AbstractMetric.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.metrics.api.model;
+
+public abstract class AbstractMetric implements Metric {
+

Review Comment:
   The classes under `org.apache.eventmesh.metrics.api.model` are too many and 
their inheritance relationship seems a little messy.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java:
##########
@@ -96,6 +104,12 @@ public EventMeshServer() {
         if (BOOTSTRAP_LIST.isEmpty()) {
             BOOTSTRAP_LIST.add(new EventMeshTcpBootstrap(this));
         }
+        List<String> metricsPluginTypes = 
configuration.getEventMeshMetricsPluginType();
+        if (CollectionUtils.isNotEmpty(metricsPluginTypes)) {
+            List<MetricsRegistry> metricsRegistries = 
metricsPluginTypes.stream().map(metric -> 
MetricsPluginFactory.getMetricsRegistry(metric))
+                .collect(Collectors.toList());
+            eventMeshMetricsManager = new 
EventMeshMetricsManager(metricsRegistries);
+        }

Review Comment:
   I think that maybe it is not a good design to place a list of metrics plugin 
types here. Is there a similar way to init metrics like other plugins (L79~L83)?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/TcpMetricsCalculator.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.tcp;
+
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants;
+
+import java.math.BigDecimal;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+class TcpMetricsCalculator {
+
+    private static final int period = 30 * 1000;

Review Comment:
   Why TCP needs a "Calculator" comparing to HTTP and gRPC?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/GeneralMetricsManager.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics;
+
+
+import org.apache.eventmesh.metrics.api.MetricsRegistry;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+import lombok.experimental.UtilityClass;
+
+/**
+ * Managing general metrics.
+ */
+@UtilityClass
+public class GeneralMetricsManager {

Review Comment:
   The naming of `GeneralMetricsManager` is a little confusing comparing with 
`MetricsManager`. How about putting them in different packages and rename it to 
a more suitable name?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/MetricInstrumentUnit.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics;
+
+
+public abstract class MetricInstrumentUnit {
+
+    private MetricInstrumentUnit() {
+
+    }

Review Comment:
   We can put this constant class into `constant` package together with 
`MonitorMetricConstants`.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java:
##########
@@ -43,6 +43,7 @@ public void init() throws Exception {
         // server init
         if (eventMeshHttpConfiguration != null) {
             eventMeshHttpServer = new EventMeshHTTPServer(eventMeshServer, 
eventMeshHttpConfiguration);
+            eventMeshHttpServer.init();
         }

Review Comment:
   Is it necessary to init eventMeshHttpServer here?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java:
##########
@@ -147,50 +156,54 @@ public void processRequest(final ChannelHandlerContext 
ctx, final AsyncContext<H
 
         }
 
-        ConcurrentHashMap<String, List<Client>> clientInfoMap =
-                
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping();
-        synchronized (clientInfoMap) {
+        synchronized 
(eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
             for (final Map.Entry<String, List<Client>> groupTopicClientMapping 
: tmpMap.entrySet()) {
-                final List<Client> localClientList = 
clientInfoMap.get(groupTopicClientMapping.getKey());
+                final List<Client> localClientList =
+                    
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicClientMapping.getKey());
                 if (CollectionUtils.isEmpty(localClientList)) {
-                    clientInfoMap.put(groupTopicClientMapping.getKey(), 
groupTopicClientMapping.getValue());
+                    
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
+                        .put(groupTopicClientMapping.getKey(), 
groupTopicClientMapping.getValue());
                 } else {
                     final List<Client> tmpClientList = 
groupTopicClientMapping.getValue();
                     supplyClientInfoList(tmpClientList, localClientList);
-                    clientInfoMap.put(groupTopicClientMapping.getKey(), 
localClientList);
+                    
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicClientMapping.getKey(),
 localClientList);
                 }
 
             }
         }
 
         final long startTime = System.currentTimeMillis();
-        HttpSummaryMetrics summaryMetrics = 
eventMeshHTTPServer.getMetrics().getSummaryMetrics();
         try {
-            final CompleteHandler<HttpCommand> handler = httpCommand -> {
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("{}", httpCommand);
+
+            final CompleteHandler<HttpCommand> handler = new 
CompleteHandler<HttpCommand>() {
+                @Override
+                public void onResponse(final HttpCommand httpCommand) {
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("{}", httpCommand);
+                        }

Review Comment:
   `isDebugEnabled` condition is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to