AndrewJSchofield commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1366906330
########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.util.Collections; +import java.util.List; + +import java.io.Closeable; + +/** + * An {@code MetricsEmitter} emits the values held by the {@link SinglePointMetric}, likely first converting them + * to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely + * the entity that is familiar with the underlying method of making the metrics visible to the + * broker. Thus, it is the primary place in the code where the implementation details are known. + * + * <p> + * + * An {@code MetricsEmitter} is stateless and the telemetry reporter should assume that the object is + * not thread safe and thus concurrent access to either the + * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided. + * + * Regarding threading, the {@link #init()} and {@link #close()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #emitMetric(SinglePointMetric)} method should only be invoked in a synchronous Review Comment: I suppose this is true for all of the various emit methods. ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + + private final String name; + private final Map<String, String> tags; + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name Review Comment: The KIP uses the term `telemetry metric name`. If that's what is meant by `.converted.` name of the metric, please use `telemetry metric name` instead. ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client Review Comment: My view is that this is now in the correct place in the source tree. It's a mixin for `MetricsReporter` and that's in `clients/src/main/java/org/apache/kafka/common/metrics`. So, I think this is correct. ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; Review Comment: I think the KIP does not specify the package, which it probably should. I'm happy with `org.apache.kafka.common.telemetry`. All of the interfaces in this PR in this package are exactly as described in the KIP. Only the package information was omitted. ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.util.Collections; +import java.util.List; + +import java.io.Closeable; + +/** + * An {@code MetricsEmitter} emits the values held by the {@link SinglePointMetric}, likely first converting them Review Comment: "likely" seems a bit odd. I would have that that the MetricsEmitter does or does not do these things. ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link MetricsEmitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + * <p/> + * + * In general, a {@code MetricsCollector} implementation is closely managed by another entity + * (that entity is colloquially referred to as the "telemetry reporter") that will be in + * charge of its lifecycle via the {@link #start()} and {@link #stop()} methods. The telemetry + * reporter should ensure that the {@link #start()} method is invoked <i>once and only once</i> + * before calls to {@link #collect(MetricsEmitter)} are made. Implementations of {@code MetricsCollector} + * should allow for the corner-case that {@link #stop()} is called before {@link #start()}, + * which might happen in the case of error on startup of the telemetry reporter. Review Comment: And what happens then? I suppose that the MetricsCollector remains STOPPED, but does `start()` fail? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); Review Comment: I'm happy with the method signatures as currently written in the code of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
