Pil0tXia commented on code in PR #3446: URL: https://github.com/apache/eventmesh/pull/3446#discussion_r1450991643
########## eventmesh-common/src/main/java/org/apache/eventmesh/common/MetricsConstants.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common; + + +public abstract class MetricsConstants { + + private MetricsConstants() { + + } Review Comment: Maybe decorating this constant with `class` is enough, instead of `abstract class`. The usages of this constant are all in metrics-plugin module, so perhaps placing this class in metrics-plugin module will be better. ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java: ########## @@ -68,37 +67,44 @@ public HeartBeatProcessor(final EventMeshHTTPServer eventMeshHTTPServer) { public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) throws Exception { HttpCommand responseEventMeshCommand; final String localAddress = IPUtils.getLocalAddress(); - HttpCommand request = asyncContext.getRequest(); + if (log.isInfoEnabled()) { log.info("cmd={}|{}|client2eventMesh|from={}|to={}", - RequestCode.get(Integer.valueOf(request.getRequestCode())), + RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress); } - final HeartbeatRequestHeader heartbeatRequestHeader = (HeartbeatRequestHeader) request.getHeader(); - final HeartbeatRequestBody heartbeatRequestBody = (HeartbeatRequestBody) request.getBody(); - EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration(); + final HeartbeatRequestHeader heartbeatRequestHeader = (HeartbeatRequestHeader) asyncContext.getRequest().getHeader(); + final HeartbeatRequestBody heartbeatRequestBody = (HeartbeatRequestBody) asyncContext.getRequest().getBody(); final HeartbeatResponseHeader heartbeatResponseHeader = - HeartbeatResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()), - httpConfiguration.getEventMeshCluster(), - localAddress, httpConfiguration.getEventMeshEnv(), - httpConfiguration.getEventMeshIDC()); + HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), + eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(), + localAddress, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(), + eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC()); //validate header - - if (StringUtils.isAnyBlank( - heartbeatRequestHeader.getIdc(), heartbeatRequestHeader.getPid(), heartbeatRequestHeader.getSys()) - || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())) { - completeResponse(request, asyncContext, heartbeatResponseHeader, - EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, HeartbeatResponseBody.class); + if (StringUtils.isBlank(heartbeatRequestHeader.getIdc()) + || StringUtils.isBlank(heartbeatRequestHeader.getPid()) + || !StringUtils.isNumeric(heartbeatRequestHeader.getPid()) + || StringUtils.isBlank(heartbeatRequestHeader.getSys())) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + heartbeatResponseHeader, + HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); + asyncContext.onComplete(responseEventMeshCommand); Review Comment: It seems that the main change here is the invocation of `createHttpCommandResponse`. What are the advantages of the validation logic here compared to the original? Bringing a more comprehensive validation logic? ########## eventmesh-common/src/main/java/org/apache/eventmesh/common/Pair.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common; + +public class Pair<Left, Right> { + + private Left left; Review Comment: We have two `Pair` classes in our project: - org.apache.eventmesh.common.Pair - org.apache.eventmesh.runtime.common.Pair The usages of the latter one can be unified to the former one. ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/MetricsManager.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics; + +import org.apache.eventmesh.metrics.api.model.Metric; + +import java.util.List; + +/** + * metric manager interface + */ + +/** + * MetricsManager is an interface for managing metrics. + */ +public interface MetricsManager { Review Comment: duplicate javadoc ########## eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/AbstractMetric.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.metrics.api.model; + +public abstract class AbstractMetric implements Metric { + Review Comment: The classes under `org.apache.eventmesh.metrics.api.model` are too many and their inheritance relationship seems a little messy. ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java: ########## @@ -96,6 +104,12 @@ public EventMeshServer() { if (BOOTSTRAP_LIST.isEmpty()) { BOOTSTRAP_LIST.add(new EventMeshTcpBootstrap(this)); } + List<String> metricsPluginTypes = configuration.getEventMeshMetricsPluginType(); + if (CollectionUtils.isNotEmpty(metricsPluginTypes)) { + List<MetricsRegistry> metricsRegistries = metricsPluginTypes.stream().map(metric -> MetricsPluginFactory.getMetricsRegistry(metric)) + .collect(Collectors.toList()); + eventMeshMetricsManager = new EventMeshMetricsManager(metricsRegistries); + } Review Comment: I think that maybe it is not a good design to place a list of metrics plugin types here. Is there a similar way to init metrics like other plugins (L79~L83)? ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/TcpMetricsCalculator.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics.tcp; + +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.apache.eventmesh.common.ThreadPoolFactory; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants; + +import java.math.BigDecimal; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class TcpMetricsCalculator { + + private static final int period = 30 * 1000; Review Comment: Why TCP needs a "Calculator" comparing to HTTP and gRPC? ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/GeneralMetricsManager.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics; + + +import org.apache.eventmesh.metrics.api.MetricsRegistry; + +import org.apache.commons.collections4.MapUtils; + +import java.util.List; +import java.util.Map; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; + +import lombok.experimental.UtilityClass; + +/** + * Managing general metrics. + */ +@UtilityClass +public class GeneralMetricsManager { Review Comment: The naming of `GeneralMetricsManager` is a little confusing comparing with `MetricsManager`. How about putting them in different packages and rename it to a more suitable name? ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/MetricInstrumentUnit.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics; + + +public abstract class MetricInstrumentUnit { + + private MetricInstrumentUnit() { + + } Review Comment: We can put this constant class into `constant` package together with `MonitorMetricConstants`. ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java: ########## @@ -43,6 +43,7 @@ public void init() throws Exception { // server init if (eventMeshHttpConfiguration != null) { eventMeshHttpServer = new EventMeshHTTPServer(eventMeshServer, eventMeshHttpConfiguration); + eventMeshHttpServer.init(); } Review Comment: Is it necessary to init eventMeshHttpServer here? ########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java: ########## @@ -147,50 +156,54 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<H } - ConcurrentHashMap<String, List<Client>> clientInfoMap = - eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping(); - synchronized (clientInfoMap) { + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) { for (final Map.Entry<String, List<Client>> groupTopicClientMapping : tmpMap.entrySet()) { - final List<Client> localClientList = clientInfoMap.get(groupTopicClientMapping.getKey()); + final List<Client> localClientList = + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicClientMapping.getKey()); if (CollectionUtils.isEmpty(localClientList)) { - clientInfoMap.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue()); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping() + .put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue()); } else { final List<Client> tmpClientList = groupTopicClientMapping.getValue(); supplyClientInfoList(tmpClientList, localClientList); - clientInfoMap.put(groupTopicClientMapping.getKey(), localClientList); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicClientMapping.getKey(), localClientList); } } } final long startTime = System.currentTimeMillis(); - HttpSummaryMetrics summaryMetrics = eventMeshHTTPServer.getMetrics().getSummaryMetrics(); try { - final CompleteHandler<HttpCommand> handler = httpCommand -> { - try { - if (log.isDebugEnabled()) { - log.debug("{}", httpCommand); + + final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() { + @Override + public void onResponse(final HttpCommand httpCommand) { + try { + if (log.isDebugEnabled()) { + log.debug("{}", httpCommand); + } Review Comment: `isDebugEnabled` condition is redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
