This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b4bf0bf693f KAFKA-19506 Implement dynamic compression type selection and fallback for client telemetry (#20144) b4bf0bf693f is described below commit b4bf0bf693f98d21312c4a3971911d5a60f67bd6 Author: Kaushik Raina <103954755+k-ra...@users.noreply.github.com> AuthorDate: Sat Aug 23 22:49:19 2025 +0530 KAFKA-19506 Implement dynamic compression type selection and fallback for client telemetry (#20144) #### Summary This PR implements dynamic compression type selection and fallback mechanism for client telemetry to handle cases where compression libraries are not available on the client classpath. #### Problem Currently, when a compression library is missing (e.g., NoClassDefFoundError), the client telemetry system catches the generic Throwable but doesn't learn from the failure. This means, the same unsupported compression type will be attempted on every telemetry push #### Solution This PR introduces a comprehensive fallback mechanism: - Specific Exception Handling: Replace generic Throwable catching with specific exceptions (IOException, NoClassDefFoundError) - Unsupported Compression Tracking: Add unsupportedCompressionTypes collection to track compression types that have failed due to missing libraries - Dynamic Selection: Enhance ClientTelemetryUtils.preferredCompressionType() to accept an unsupported types parameter and filter out known problematic compression types - Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe access to the unsupported types collection - Improved Logging: Include exception details in log messages for better debugging #### Key Changes - Modified createPushRequest() to track failed compression types in unsupportedCompressionTypes - Updated ClientTelemetryUtils.preferredCompressionType() to filter out unsupported types - Enhanced exception handling with specific exception types instead of Throwable #### Testing - Added appropriate Unit tests - Testing apache kafka on local logs: ``` ✗ cat ~/Desktop/kafka-client.log | grep " org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter" 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Telemetry subscription push interval value from broker was 5000; to stagger requests the first push interval is being adjusted to 4551 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Updating subscription - subscription: ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA, subscriptionId=1650084878, pushIntervalMs=5000, acceptedCompressionTypes=[zstd, lz4, snappy, none], deltaTemporality=true, selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398}; intervalMs: 4551, lastRequestMs: 1752739012639 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Client telemetry registered with client instance id: aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:56:57:224 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library zstd not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:56:57:295 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:02:296 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:02:297 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:02:300 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library lz4 not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:02:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:07:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:07:330 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:07:331 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library snappy not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:07:344 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:12:400 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:17:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:22:508 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:27:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:32:578 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:37:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:42:646 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:47:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:52:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:57:765 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED ``` Reviewers: poorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../internals/ClientTelemetryReporter.java | 20 +++- .../telemetry/internals/ClientTelemetryUtils.java | 25 ++-- .../internals/ClientTelemetryReporterTest.java | 131 +++++++++++++++++++++ .../internals/ClientTelemetryUtilsTest.java | 43 ++++--- 4 files changed, 195 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index e0491943fef..bef65977be4 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -50,6 +50,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -269,6 +270,7 @@ public class ClientTelemetryReporter implements MetricsReporter { private static final double INITIAL_PUSH_JITTER_LOWER = 0.5; private static final double INITIAL_PUSH_JITTER_UPPER = 1.5; + private final Set<CompressionType> unsupportedCompressionTypes = ConcurrentHashMap.newKeySet(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Condition subscriptionLoaded = lock.writeLock().newCondition(); /* @@ -713,12 +715,26 @@ public class ClientTelemetryReporter implements MetricsReporter { return Optional.empty(); } - CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); + CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); } catch (Throwable e) { - log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + // Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs) + // and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry. + if (e instanceof Error && !(e instanceof NoClassDefFoundError) && !(e.getCause() instanceof NoClassDefFoundError)) { + lock.writeLock().lock(); + try { + state = ClientTelemetryState.TERMINATED; + } finally { + lock.writeLock().unlock(); + } + log.error("Unexpected error occurred while compressing telemetry payload for compression: {}, stopping client telemetry", compressionType, e); + throw new KafkaException("Unexpected compression error", e); + } + + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType, e); + unsupportedCompressionTypes.add(compressionType); compressedPayload = ByteBuffer.wrap(payload.toByteArray()); compressionType = CompressionType.NONE; } diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 3c555afb3b0..111b041946c 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import io.opentelemetry.proto.metrics.v1.MetricsData; @@ -181,13 +182,23 @@ public class ClientTelemetryUtils { return validateResourceLabel(metadata, MetricsContext.NAMESPACE); } - public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) { - if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { - // Broker is providing the compression types in order of preference. Grab the - // first one. - return acceptedCompressionTypes.get(0); - } - return CompressionType.NONE; + /** + * Determines the preferred compression type from broker-accepted types, avoiding unsupported ones. + * + * @param acceptedCompressionTypes the list of compression types accepted by the broker in order + * of preference (must not be null, use empty list if no compression is accepted) + * @param unsupportedCompressionTypes the set of compression types that should be avoided due to + * missing libraries or previous failures (must not be null) + * @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable + * compression type is available + */ + public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes, Set<CompressionType> unsupportedCompressionTypes) { + // Broker is providing the compression types in order of preference. Grab the + // first one that's supported. + return acceptedCompressionTypes.stream() + .filter(t -> !unsupportedCompressionTypes.contains(t)) + .findFirst() + .orElse(CompressionType.NONE); } public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index b708b4eeb60..c06e853b073 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; @@ -63,8 +64,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; public class ClientTelemetryReporterTest { @@ -413,6 +416,134 @@ public class ClientTelemetryReporterTest { } } + @Test + public void testCreateRequestPushCompressionFallbackToNextType() { + clientTelemetryReporter.configure(configs); + clientTelemetryReporter.contextChange(metricsContext); + + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Set up subscription with multiple compression types: GZIP -> LZ4 -> SNAPPY + ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription( + uuid, 1234, 20000, List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), true, null); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + + try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { + // First request: GZIP fails with NoClassDefFoundError, should use NONE for this request + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not available")); + + Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (GZIP gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Second request: LZ4 is selected (since GZIP is now cached as unsupported), LZ4 fails, should use NONE + // Note that some libraries eg. LZ4 return KafkaException with cause as NoClassDefFoundError + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available"))); + + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (LZ4 gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Third request: SNAPPY is selected (since GZIP and LZ4 are now cached as unsupported), SNAPPY fails, should use NONE + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not available")); + + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should fallback to NONE for this request (SNAPPY gets cached as unsupported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state for next request + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Fourth request: All compression types are now cached as unsupported, should use NONE directly + requestOptional = telemetrySender.createRequest(); + assertNotNull(requestOptional); + assertTrue(requestOptional.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build()); + request = (PushTelemetryRequest) requestOptional.get().build(); + + // Should use NONE directly (no compression types are supported) + assertEquals(CompressionType.NONE.id, request.data().compressionType()); + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + } + } + + @Test + public void testCreateRequestPushCompressionFallbackAndTermination() { + clientTelemetryReporter.configure(configs); + clientTelemetryReporter.contextChange(metricsContext); + + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // Set up subscription with ZSTD compression type + ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription( + uuid, 1234, 20000, List.of(CompressionType.ZSTD, CompressionType.LZ4), true, null); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + + try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) { + + // === Test 1: NoClassDefFoundError fallback (recoverable) === + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.ZSTD))) + .thenThrow(new NoClassDefFoundError("com/github/luben/zstd/BufferPool")); + + assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); + + Optional<AbstractRequest.Builder<?>> request1 = telemetrySender.createRequest(); + assertNotNull(request1); + assertTrue(request1.isPresent()); + assertInstanceOf(PushTelemetryRequest.class, request1.get().build()); + PushTelemetryRequest pushRequest1 = (PushTelemetryRequest) request1.get().build(); + assertEquals(CompressionType.NONE.id, pushRequest1.data().compressionType()); // Fallback to NONE + assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state()); + + // Reset state (simulate successful response handling) + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + + // === Test 2: OutOfMemoryError causes termination (non-recoverable Error) === + mockedCompress.reset(); + mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))) + .thenThrow(new OutOfMemoryError("Out of memory during compression")); + + assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); + + assertThrows(KafkaException.class, () -> telemetrySender.createRequest()); + assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); + + // === Test 3: After termination, no more requests === + Optional<AbstractRequest.Builder<?>> request3 = telemetrySender.createRequest(); + assertNotNull(request3); + assertFalse(request3.isPresent()); // No request created + assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); // State remains TERMINATED + } + } + @Test public void testHandleResponseGetSubscriptions() { ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java index 41679bed3f7..47925ff8e0a 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java @@ -30,10 +30,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import io.opentelemetry.proto.metrics.v1.Metric; @@ -69,12 +68,12 @@ public class ClientTelemetryUtilsTest { @Test public void testGetSelectorFromRequestedMetrics() { // no metrics selector - assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList())); + assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of())); assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(null)); // all metrics selector - assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*"))); + assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("*"))); // specific metrics selector - Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", "metric2")); + Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("metric1", "metric2")); assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector); assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector); assertTrue(selector.test(new MetricKey("metric1.test"))); @@ -86,7 +85,7 @@ public class ClientTelemetryUtilsTest { @Test public void testGetCompressionTypesFromAcceptedList() { assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size()); - assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size()); + assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(List.of()).size()); List<Byte> compressionTypes = new ArrayList<>(); compressionTypes.add(CompressionType.GZIP.id); @@ -123,10 +122,24 @@ public class ClientTelemetryUtilsTest { @Test public void testPreferredCompressionType() { - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList())); - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null)); - assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP))); - assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE))); + // Test with no unsupported types + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of())); + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.NONE, CompressionType.GZIP), Set.of())); + assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), Set.of())); + + // Test unsupported type filtering (returns first available type, or NONE if all are unsupported) + assertEquals(CompressionType.LZ4, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP))); + assertEquals(CompressionType.SNAPPY, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), Set.of(CompressionType.GZIP, CompressionType.LZ4))); + assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP, CompressionType.LZ4))); + + // Test edge case: no match between requested and supported types + assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.SNAPPY))); + + // Test NullPointerException for null parameters + assertThrows(NullPointerException.class, () -> + ClientTelemetryUtils.preferredCompressionType(null, Set.of())); + assertThrows(NullPointerException.class, () -> + ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), null)); } @ParameterizedTest @@ -150,19 +163,19 @@ public class ClientTelemetryUtilsTest { private MetricsData getMetricsData() { List<Metric> metricsList = new ArrayList<>(); metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet()) + new MetricKey("metricName"), 1.0, true, Instant.now(), null, Set.of()) .builder().build()); metricsList.add(SinglePointMetric.sum( - new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet()) + new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.deltaSum( - new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet()) + new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet()) + new MetricKey("metricName3"), 1.0, Instant.now(), Set.of()) .builder().build()); metricsList.add(SinglePointMetric.gauge( - new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet()) + new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Set.of()) .builder().build()); MetricsData.Builder builder = MetricsData.newBuilder();