philipnee commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1366056376
########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/Emitter.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.util.Collections; +import java.util.List; + +import java.io.Closeable; + +/** + * An {@code Emitter} emits the values held by the {@link SinglePointMetric}, likely first converting them + * to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely + * the entity that is familiar with the underlying method of making the metrics visible to the + * broker. Thus, it is the primary place in the code where the implementation details are known. + * + * <p> + * + * An {@code Emitter} is stateless and the telemetry reporter should assume that the object is + * not thread safe and thus concurrent access to either the + * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided. + * + * Regarding threading, the {@link #init()} and {@link #close()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #emitMetric(SinglePointMetric)} method should only be invoked in a synchronous + * manner. + */ +public interface Emitter extends Closeable { Review Comment: `MetricEmitter`? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); + + /** + * Handle response for failed get telemetry subscriptions request. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); Review Comment: `handleFailedGetTelemetricSubscriptionResponse` ? We are talking about the response right? Can it take a Throwable type instead of KafkaException? The exception can be retriable as well right? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/Emitter.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.util.Collections; +import java.util.List; + +import java.io.Closeable; + +/** + * An {@code Emitter} emits the values held by the {@link SinglePointMetric}, likely first converting them + * to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely + * the entity that is familiar with the underlying method of making the metrics visible to the + * broker. Thus, it is the primary place in the code where the implementation details are known. + * + * <p> + * + * An {@code Emitter} is stateless and the telemetry reporter should assume that the object is + * not thread safe and thus concurrent access to either the + * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided. + * + * Regarding threading, the {@link #init()} and {@link #close()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #emitMetric(SinglePointMetric)} method should only be invoked in a synchronous + * manner. + */ +public interface Emitter extends Closeable { + + /** + * Performs the necessary logic to determine if the metric is to be emitted. The telemetry + * reporter should respect this and not just call the {@link #emitMetric(SinglePointMetric)} directly. + * + * @param metricKeyable Object from which to get the {@link MetricKey} + * @return {@code true} if the metric should be emitted, {@code false} otherwise + */ + boolean shouldEmitMetric(MetricKeyable metricKeyable); + + /** + * Emits the metric in an implementation-specific fashion. Depending on the implementation, + * calls made to this after {@link #close()} has been invoked will fail. + * + * @param metric {@code SinglePointMetric} + * @return {@code true} if the metric was emitted, {@code false} otherwise + */ + boolean emitMetric(SinglePointMetric metric); Review Comment: alternative is to return void can thrown an exception upon failed emits. What are the scenarios where the metrics cannot be emitted? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + + private final String name; + private final Map<String, String> tags; + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + */ + public MetricKey(String name) { + this(name, null); + } + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ + public MetricKey(String name, Map<String, String> tags) { + this.name = Objects.requireNonNull(name); + this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); + } + + public MetricKey(MetricName metricName) { + this(metricName.name(), metricName.tags()); + } + + @Override + public MetricKey key() { + return this; + } + + public String getName() { + return name; + } + + public Map<String, String> tags() { + return tags; + } + + @Override + public int hashCode() { + return Objects.hash(name, tags); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MetricKey other = (MetricKey) obj; + return this.getName().equals(other.getName()) && this.tags().equals(other.tags()); + } + + @Override + public String toString() { + return "MetricKey [name=" + getName() + ", tags=" + tags() + "]"; Review Comment: I wonder if we should follow the conventional way to do toString to make it consistent with the rest of the repo. @mjsax - Is there a conventional way to "toString" ? Example ``` "LeaderAndEpoch{" + "leader=" + leader + ", epoch=" + epoch.map(Number::toString).orElse("absent") + '}'; ``` Honestly speaking - Not all toString here use the same format, but l think we should try to be as similar as possible. ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); Review Comment: ditto `handlePushTelemtetricResponse` ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.nio.ByteBuffer; + [email protected] +public interface ClientTelemetryPayload { + + /** + * @return Client's instance id. + */ + Uuid clientInstanceId(); + + /** + * Indicates whether client is terminating, e.g., the last metrics push from this client instance. + * + * @return {@code true} if client is terminating, else false + */ + boolean isTerminating(); Review Comment: Can we document the fact that we are sending payload with a field Terminating to prevent the send from being rate-limited? I was a bit confused about this field so I needed to read the KIP to find out the purpose. ```When a client with an active metrics subscription is being shut down, it should send its final metrics without waiting for the PushIntervalMs time. To avoid the receiving broker’s metrics rate-limiter discarding this out-of-profile push, the PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such unthrottled metrics push for each combination of client instance ID and SubscriptionId.``` ########## clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.nio.ByteBuffer; + [email protected] +public interface ClientTelemetryPayload { + + /** + * @return Client's instance id. + */ + Uuid clientInstanceId(); + + /** + * Indicates whether client is terminating, e.g., the last metrics push from this client instance. + * + * @return {@code true} if client is terminating, else false + */ + boolean isTerminating(); Review Comment: Can we document the fact that we are sending payload with a field Terminating to prevent the send from being rate-limited? I was a bit confused about this field so I needed to read the KIP to find out the purpose. ```When a client with an active metrics subscription is being shut down, it should send its final metrics without waiting for the PushIntervalMs time. To avoid the receiving broker’s metrics rate-limiter discarding this out-of-profile push, the PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such unthrottled metrics push for each combination of client instance ID and SubscriptionId.``` ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + + private final String name; + private final Map<String, String> tags; + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + */ + public MetricKey(String name) { + this(name, null); + } + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ + public MetricKey(String name, Map<String, String> tags) { + this.name = Objects.requireNonNull(name); + this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); Review Comment: if the tags is optional, can we wrap it with Optional type? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); + + /** + * Handle response for failed get telemetry subscriptions request. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); + + /** + * Handle response for failed push telemetry request. Review Comment: handling failed telemtric push response right? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); + + /** + * Handle response for failed get telemetry subscriptions request. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); + + /** + * Handle response for failed push telemetry request. + * + * @param kafkaException the fatal exception. + */ + void handleFailedPushTelemetryRequest(KafkaException kafkaException); Review Comment: same as above, it should be `handleFailed...Response` right? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); Review Comment: should we be more consistent with the failed response handler? `handleTelemetricSubscriptionResponse` ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + * <p> + * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional<Builder<?>> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); + + /** + * Handle response for failed get telemetry subscriptions request. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); Review Comment: The only way network client fails a Request is really NetworkException I think, which is handled in the networkClient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
