mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1364913876
########## clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.collector; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.telemetry.emitter.Emitter; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link Emitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + * <p/> Review Comment: Should just be `<p>` (similar below) ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; Review Comment: What just comes to my mind: the KIP should specify where new public interfaces are added. It's missing. Can we add it? ########## clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.collector; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.telemetry.emitter.Emitter; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link Emitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + * <p/> + * + * In general, a {@code MetricsCollector} implementation is closely managed by another entity + * (that entity is colloquially referred to as the "telemetry reporter") that will be in + * charge of its lifecycle via the {@link #start()} and {@link #stop()} methods. The telemetry + * reporter should ensure that the {@link #start()} method is invoked <i>once and only once</i> + * before calls to {@link #collect(Emitter)} are made. Implementations of {@code MetricsCollector} + * should allow for the corner-case that {@link #stop()} is called before {@link #start()}, + * which might happen in the case of error on startup of the telemetry reporter. + * + * <p/> + * + * Regarding threading, the {@link #start()} and {@link #stop()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #collect(Emitter)} method should only be invoked in a synchronous + * manner. + * + * @see Emitter + */ [email protected] +public interface MetricsCollector { Review Comment: This interface is not on the KIP either -- should it be moved to `internal` package? (For this case, we also don't need the `Evolving` annotation, that we only need for public stuff) ########## clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java: ########## @@ -24,13 +24,14 @@ import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.telemetry.ClientTelemetry; /** * A plugin interface to allow things to listen as new metrics are created so they can be reported. * <p> * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Reconfigurable, AutoCloseable { +public interface MetricsReporter extends Reconfigurable, AutoCloseable, ClientTelemetry { Review Comment: Just dug a little bit in the code. `MetricsReporter` seems to be a public plugable interface, that allows users to write their own reported and plug them into a client via `metric.reporters` config. We should not enforce users to implement `ClientTelemetry`. Also, the KIP JavaDocs says: ``` A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side public interface ClientTelemetry ``` So it seems to be a server side interface, not belonging to the client at all? ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.nio.ByteBuffer; + [email protected] +public interface ClientTelemetryPayload { + + /** + * Client's instance id. Review Comment: Should we use `@return` (same below) ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client + * telemetry on the client or server side. + */ [email protected] +public interface ClientTelemetry { + + /** + * Implemented by the client {@link MetricsReporter} to provide a {@link ClientTelemetrySender} + * instance. + * <p> + * This instance may be cached by the client. + * <p> + * This method must always be called after the initial call to + * {@link MetricsReporter#contextChange(MetricsContext)} on the {@link MetricsReporter} + * implementing this interface. Review Comment: Not sure if I understand this comment? Can you elaborate? ########## clients/src/main/java/org/apache/kafka/common/telemetry/metrics/SinglePointMetric.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.metrics; + +/** + * This class represents a metric that does not yet contain resource tags. + * These additional resource tags will be added before emitting metrics by the telemetry reporter. + */ +public class SinglePointMetric implements MetricKeyable { Review Comment: `internals` package? ########## clients/src/main/java/org/apache/kafka/common/telemetry/metrics/MetricKeyable.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * An object that provides a MetricKey that we can use to uniquely identify a metric. This + * is useful for filtering as well as calculating delta metrics. + */ [email protected] +public interface MetricKeyable { Review Comment: `internals` package? ########## clients/src/main/java/org/apache/kafka/common/telemetry/emitter/Emitter.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.emitter; + +import java.util.Collections; +import java.util.List; +import org.apache.kafka.common.telemetry.metrics.MetricKey; +import org.apache.kafka.common.telemetry.metrics.MetricKeyable; +import org.apache.kafka.common.telemetry.metrics.SinglePointMetric; + +import java.io.Closeable; + +/** + * An {@code Emitter} emits the values held by the {@link SinglePointMetric}, likely first converting them + * to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely + * the entity that is familiar with the underlying method of making the metrics visible to the + * broker. Thus, it is the primary place in the code where the implementation details are known. + * + * <p/> + * + * An {@code Emitter} is stateless and the telemetry reporter should assume that the object is + * not thread safe and thus concurrent access to either the + * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided. + * + * Regarding threading, the {@link #init()} and {@link #close()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #emitMetric(SinglePointMetric)} method should only be invoked in a synchronous + * manner. + */ +public interface Emitter extends Closeable { Review Comment: Should this be in `internals` package, too? ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AbstractRequest.Builder; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for sending client + * telemetry to the broker. + */ [email protected] +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); Review Comment: Why is this part of public API? -- Should the client internal implementation not track this? Why would we need to push this on the user? Thinking about this one more, I am actually wondering why a user would need/want to implement anything client side to begin with? Seems unnecessary (or maybe I am missing something)? ########## clients/src/main/java/org/apache/kafka/common/telemetry/metrics/MetricKey.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.metrics; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { Review Comment: Seem the broker plugin would need to use this, so it's public? Or would the broker only see the generate output of `toString()`? For this case, this should go into `internals` package? ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AbstractRequest.Builder; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for sending client + * telemetry to the broker. Review Comment: I thought `ClientTelemetry` would be the interface to indicate this, while `ClientTelemetrySender` implements the required logic to actually send metrics? But because it was not on the KIP, hard to tell. ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client Review Comment: > to support either client or server side mapping What does this mean? My understanding was that `ClientTelemetry` is something that is only used client side (for this case, I tend to agree with Kirk -- if we really go this route, we need to update the KIP accordingly). ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client + * telemetry on the client or server side. Review Comment: Can you elaborate? The JavaDoc comment below clearly say: ``` Implemented by the client {@link MetricsReporter} to provide a {@link ClientTelemetrySender} instance. ``` What is actually a contraction to what in the KIP: ``` A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side public interface ClientTelemetry { ``` So it seems to be for clients only. Not sure what you mean by "divide the interfaces"? Should there be two separate `ClientTelemetry` one for server one for client? ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Optional; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client + * telemetry on the client or server side. + */ [email protected] +public interface ClientTelemetry { + + /** + * Implemented by the client {@link MetricsReporter} to provide a {@link ClientTelemetrySender} + * instance. + * <p> + * This instance may be cached by the client. + * + * @return Optional client side instance of {@link ClientTelemetrySender}. + */ + default Optional<ClientTelemetrySender> clientSender() { Review Comment: Yeah, not in the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
