This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4bf0bf693f KAFKA-19506 Implement dynamic compression type selection 
and fallback for client telemetry (#20144)
b4bf0bf693f is described below

commit b4bf0bf693f98d21312c4a3971911d5a60f67bd6
Author: Kaushik Raina <103954755+k-ra...@users.noreply.github.com>
AuthorDate: Sat Aug 23 22:49:19 2025 +0530

    KAFKA-19506 Implement dynamic compression type selection and fallback for 
client telemetry (#20144)
    
    #### Summary
    This PR implements dynamic compression type selection and fallback
    mechanism for client telemetry to handle cases where compression
    libraries are not available on the client classpath.
    
    #### Problem
    Currently, when a compression library is missing (e.g.,
    NoClassDefFoundError), the client telemetry system catches the generic
    Throwable but doesn't learn from the failure. This means, the same
    unsupported compression type will be attempted on every telemetry push
    
    #### Solution
    This PR introduces a comprehensive fallback mechanism:
    - Specific Exception Handling: Replace generic Throwable catching with
    specific exceptions (IOException, NoClassDefFoundError)
    - Unsupported Compression Tracking: Add unsupportedCompressionTypes
    collection to track compression types that have failed due to missing
    libraries
    - Dynamic Selection: Enhance
    ClientTelemetryUtils.preferredCompressionType() to accept an unsupported
    types parameter and filter out known problematic compression types
    - Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe
    access to the unsupported types collection
    - Improved Logging: Include exception details in log messages for better
    debugging
    
    #### Key Changes
    - Modified createPushRequest() to track failed compression types in
    unsupportedCompressionTypes
    - Updated ClientTelemetryUtils.preferredCompressionType() to filter out
    unsupported types
    - Enhanced exception handling with specific exception types instead of
    Throwable
    
    #### Testing
    - Added appropriate Unit tests
    - Testing apache kafka on local logs:
    ```
    ✗ cat ~/Desktop/kafka-client.log | grep "
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter"
    2025-07-17 07:56:52:602 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry subscription request with client instance id
    AAAAAAAAAAAAAAAAAAAAAA
    2025-07-17 07:56:52:602 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from SUBSCRIPTION_NEEDED to
    SUBSCRIPTION_IN_PROGRESS
    2025-07-17 07:56:52:640 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:56:52:640 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Telemetry subscription push interval value from broker was 5000; to
    stagger requests the first push interval is being adjusted to 4551
    2025-07-17 07:56:52:640 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Updating subscription - subscription:
    ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA,
    subscriptionId=1650084878, pushIntervalMs=5000,
    acceptedCompressionTypes=[zstd, lz4, snappy, none],
    deltaTemporality=true,
    
selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398};
    intervalMs: 4551, lastRequestMs: 1752739012639
    2025-07-17 07:56:52:640 [kafka-producer-network-thread |
    kr-kafka-producer] INFO
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Client telemetry registered with client instance id:
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:56:57:196 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:56:57:196 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:56:57:224 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Compression library zstd not found, sending uncompressed data
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
    2025-07-17 07:56:57:295 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:02:296 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:02:297 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:02:300 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Compression library lz4 not found, sending uncompressed data
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
    2025-07-17 07:57:02:329 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:07:329 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:07:330 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:07:331 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Compression library snappy not found, sending uncompressed data
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
            at
    
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
    2025-07-17 07:57:07:344 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:12:346 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:12:346 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:12:400 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:17:402 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:17:402 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:17:442 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:22:442 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:22:442 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:22:508 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:27:512 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:27:512 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:27:555 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:32:555 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:32:555 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:32:578 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:37:580 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:37:580 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:37:606 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:42:606 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:42:606 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:42:646 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:47:647 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:47:647 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:47:673 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:52:673 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:52:673 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:52:711 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    2025-07-17 07:57:57:711 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Creating telemetry push request with client instance id
    aVd3fzviRGSgEuAWNY5mMA
    2025-07-17 07:57:57:711 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
    2025-07-17 07:57:57:765 [kafka-producer-network-thread |
    kr-kafka-producer] DEBUG
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
    ```
    
    Reviewers: poorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 .../internals/ClientTelemetryReporter.java         |  20 +++-
 .../telemetry/internals/ClientTelemetryUtils.java  |  25 ++--
 .../internals/ClientTelemetryReporterTest.java     | 131 +++++++++++++++++++++
 .../internals/ClientTelemetryUtilsTest.java        |  43 ++++---
 4 files changed, 195 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index e0491943fef..bef65977be4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -50,6 +50,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -269,6 +270,7 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
         private static final double INITIAL_PUSH_JITTER_LOWER = 0.5;
         private static final double INITIAL_PUSH_JITTER_UPPER = 1.5;
 
+        private final Set<CompressionType> unsupportedCompressionTypes = 
ConcurrentHashMap.newKeySet();
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
         private final Condition subscriptionLoaded = 
lock.writeLock().newCondition();
         /*
@@ -713,12 +715,26 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
             } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
+                // Distinguish between recoverable errors 
(NoClassDefFoundError for missing compression libs) 
+                // and fatal errors (OutOfMemoryError, etc.) that should 
terminate telemetry.
+                if (e instanceof Error && !(e instanceof NoClassDefFoundError) 
&& !(e.getCause() instanceof NoClassDefFoundError)) {
+                    lock.writeLock().lock();
+                    try {
+                        state = ClientTelemetryState.TERMINATED;
+                    } finally {
+                        lock.writeLock().unlock();
+                    }
+                    log.error("Unexpected error occurred while compressing 
telemetry payload for compression: {}, stopping client telemetry", 
compressionType, e);
+                    throw new KafkaException("Unexpected compression error", 
e);
+                }
+
+                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType, e);
+                unsupportedCompressionTypes.add(compressionType);
                 compressedPayload = ByteBuffer.wrap(payload.toByteArray());
                 compressionType = CompressionType.NONE;
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 3c555afb3b0..111b041946c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -39,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import io.opentelemetry.proto.metrics.v1.MetricsData;
@@ -181,13 +182,23 @@ public class ClientTelemetryUtils {
         return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
     }
 
-    public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
-        if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
-            // Broker is providing the compression types in order of 
preference. Grab the
-            // first one.
-            return acceptedCompressionTypes.get(0);
-        }
-        return CompressionType.NONE;
+    /**
+     * Determines the preferred compression type from broker-accepted types, 
avoiding unsupported ones.
+     * 
+     * @param acceptedCompressionTypes the list of compression types accepted 
by the broker in order 
+     *                                of preference (must not be null, use 
empty list if no compression is accepted)
+     * @param unsupportedCompressionTypes the set of compression types that 
should be avoided due to 
+     *                                   missing libraries or previous 
failures (must not be null)
+     * @return the preferred compression type to use, or {@link 
CompressionType#NONE} if no acceptable
+     *         compression type is available
+     */
+    public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes, 
Set<CompressionType> unsupportedCompressionTypes) {
+        // Broker is providing the compression types in order of preference. 
Grab the
+        // first one that's supported.
+        return acceptedCompressionTypes.stream()
+                .filter(t -> !unsupportedCompressionTypes.contains(t))
+                .findFirst()
+                .orElse(CompressionType.NONE);
     }
 
     public static ByteBuffer compress(MetricsData metrics, CompressionType 
compressionType) throws IOException {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index b708b4eeb60..c06e853b073 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
@@ -63,8 +64,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
 public class ClientTelemetryReporterTest {
 
@@ -413,6 +416,134 @@ public class ClientTelemetryReporterTest {
         }
     }
 
+    @Test
+    public void testCreateRequestPushCompressionFallbackToNextType() {
+        clientTelemetryReporter.configure(configs);
+        clientTelemetryReporter.contextChange(metricsContext);
+
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+        // Set up subscription with multiple compression types: GZIP -> LZ4 -> 
SNAPPY
+        ClientTelemetryReporter.ClientTelemetrySubscription subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(
+            uuid, 1234, 20000, List.of(CompressionType.GZIP, 
CompressionType.LZ4, CompressionType.SNAPPY), true, null);
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+
+        try (MockedStatic<ClientTelemetryUtils> mockedCompress = 
Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
+            // First request: GZIP fails with NoClassDefFoundError, should use 
NONE for this request
+            mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), 
eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not 
available"));
+
+            Optional<AbstractRequest.Builder<?>> requestOptional = 
telemetrySender.createRequest();
+            assertNotNull(requestOptional);
+            assertTrue(requestOptional.isPresent());
+            assertInstanceOf(PushTelemetryRequest.class, 
requestOptional.get().build());
+            PushTelemetryRequest request = (PushTelemetryRequest) 
requestOptional.get().build();
+
+            // Should fallback to NONE for this request (GZIP gets cached as 
unsupported)
+            assertEquals(CompressionType.NONE.id, 
request.data().compressionType());
+            assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, 
telemetrySender.state());
+
+            // Reset state for next request
+            
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+            // Second request: LZ4 is selected (since GZIP is now cached as 
unsupported), LZ4 fails, should use NONE
+            // Note that some libraries eg. LZ4 return KafkaException with 
cause as NoClassDefFoundError
+            mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), 
eq(CompressionType.LZ4))).thenThrow(new KafkaException(new 
NoClassDefFoundError("LZ4 not available")));
+
+            requestOptional = telemetrySender.createRequest();
+            assertNotNull(requestOptional);
+            assertTrue(requestOptional.isPresent());
+            assertInstanceOf(PushTelemetryRequest.class, 
requestOptional.get().build());
+            request = (PushTelemetryRequest) requestOptional.get().build();
+
+            // Should fallback to NONE for this request (LZ4 gets cached as 
unsupported)
+            assertEquals(CompressionType.NONE.id, 
request.data().compressionType());
+            assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, 
telemetrySender.state());
+
+            // Reset state for next request
+            
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+            // Third request: SNAPPY is selected (since GZIP and LZ4 are now 
cached as unsupported), SNAPPY fails, should use NONE
+            mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), 
eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not 
available"));
+
+            requestOptional = telemetrySender.createRequest();
+            assertNotNull(requestOptional);
+            assertTrue(requestOptional.isPresent());
+            assertInstanceOf(PushTelemetryRequest.class, 
requestOptional.get().build());
+            request = (PushTelemetryRequest) requestOptional.get().build();
+
+            // Should fallback to NONE for this request (SNAPPY gets cached as 
unsupported)
+            assertEquals(CompressionType.NONE.id, 
request.data().compressionType());
+            assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, 
telemetrySender.state());
+
+            // Reset state for next request
+            
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+            // Fourth request: All compression types are now cached as 
unsupported, should use NONE directly
+            requestOptional = telemetrySender.createRequest();
+            assertNotNull(requestOptional);
+            assertTrue(requestOptional.isPresent());
+            assertInstanceOf(PushTelemetryRequest.class, 
requestOptional.get().build());
+            request = (PushTelemetryRequest) requestOptional.get().build();
+
+            // Should use NONE directly (no compression types are supported)
+            assertEquals(CompressionType.NONE.id, 
request.data().compressionType());
+            assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, 
telemetrySender.state());
+        }
+    }
+
+    @Test
+    public void testCreateRequestPushCompressionFallbackAndTermination() {
+        clientTelemetryReporter.configure(configs);
+        clientTelemetryReporter.contextChange(metricsContext);
+
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+        // Set up subscription with ZSTD compression type
+        ClientTelemetryReporter.ClientTelemetrySubscription subscription = new 
ClientTelemetryReporter.ClientTelemetrySubscription(
+            uuid, 1234, 20000, List.of(CompressionType.ZSTD, 
CompressionType.LZ4), true, null);
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+
+        try (MockedStatic<ClientTelemetryUtils> mockedCompress = 
Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
+            
+            // === Test 1: NoClassDefFoundError fallback (recoverable) ===
+            mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), 
eq(CompressionType.ZSTD)))
+                    .thenThrow(new 
NoClassDefFoundError("com/github/luben/zstd/BufferPool"));
+            
+            assertEquals(ClientTelemetryState.PUSH_NEEDED, 
telemetrySender.state());
+            
+            Optional<AbstractRequest.Builder<?>> request1 = 
telemetrySender.createRequest();
+            assertNotNull(request1);
+            assertTrue(request1.isPresent());
+            assertInstanceOf(PushTelemetryRequest.class, 
request1.get().build());
+            PushTelemetryRequest pushRequest1 = (PushTelemetryRequest) 
request1.get().build();
+            assertEquals(CompressionType.NONE.id, 
pushRequest1.data().compressionType()); // Fallback to NONE
+            assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, 
telemetrySender.state());
+            
+            // Reset state (simulate successful response handling)
+            
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+            
+            // === Test 2: OutOfMemoryError causes termination 
(non-recoverable Error) ===
+            mockedCompress.reset();
+            mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), 
eq(CompressionType.LZ4)))
+                    .thenThrow(new OutOfMemoryError("Out of memory during 
compression"));
+            
+            assertEquals(ClientTelemetryState.PUSH_NEEDED, 
telemetrySender.state());
+
+            assertThrows(KafkaException.class, () -> 
telemetrySender.createRequest());
+            assertEquals(ClientTelemetryState.TERMINATED, 
telemetrySender.state());
+            
+            // === Test 3: After termination, no more requests ===
+            Optional<AbstractRequest.Builder<?>> request3 = 
telemetrySender.createRequest();
+            assertNotNull(request3);
+            assertFalse(request3.isPresent()); // No request created
+            assertEquals(ClientTelemetryState.TERMINATED, 
telemetrySender.state()); // State remains TERMINATED
+        }
+    }
+
     @Test
     public void testHandleResponseGetSubscriptions() {
         ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 41679bed3f7..47925ff8e0a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -30,10 +30,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import io.opentelemetry.proto.metrics.v1.Metric;
@@ -69,12 +68,12 @@ public class ClientTelemetryUtilsTest {
     @Test
     public void testGetSelectorFromRequestedMetrics() {
         // no metrics selector
-        assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList()));
+        assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of()));
         assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(null));
         // all metrics selector
-        assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*")));
+        assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("*")));
         // specific metrics selector
-        Predicate<? super MetricKeyable> selector = 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", 
"metric2"));
+        Predicate<? super MetricKeyable> selector = 
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("metric1", 
"metric2"));
         assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector);
         assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector);
         assertTrue(selector.test(new MetricKey("metric1.test")));
@@ -86,7 +85,7 @@ public class ClientTelemetryUtilsTest {
     @Test
     public void testGetCompressionTypesFromAcceptedList() {
         assertEquals(0, 
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size());
-        assertEquals(0, 
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size());
+        assertEquals(0, 
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(List.of()).size());
 
         List<Byte> compressionTypes = new ArrayList<>();
         compressionTypes.add(CompressionType.GZIP.id);
@@ -123,10 +122,24 @@ public class ClientTelemetryUtilsTest {
 
     @Test
     public void testPreferredCompressionType() {
-        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
-        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(null));
-        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE,
 CompressionType.GZIP)));
-        assertEquals(CompressionType.GZIP, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP,
 CompressionType.NONE)));
+        // Test with no unsupported types
+        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of()));
+        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.NONE, 
CompressionType.GZIP), Set.of()));
+        assertEquals(CompressionType.GZIP, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.NONE), Set.of()));
+
+        // Test unsupported type filtering (returns first available type, or 
NONE if all are unsupported)
+        assertEquals(CompressionType.LZ4, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.LZ4), Set.of(CompressionType.GZIP)));
+        assertEquals(CompressionType.SNAPPY, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.LZ4, CompressionType.SNAPPY), Set.of(CompressionType.GZIP, 
CompressionType.LZ4)));
+        assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.LZ4), Set.of(CompressionType.GZIP, CompressionType.LZ4)));
+        
+        // Test edge case: no match between requested and supported types
+        assertEquals(CompressionType.GZIP, 
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.LZ4), Set.of(CompressionType.SNAPPY)));
+
+        // Test NullPointerException for null parameters
+        assertThrows(NullPointerException.class, () ->
+            ClientTelemetryUtils.preferredCompressionType(null, Set.of()));
+        assertThrows(NullPointerException.class, () ->
+            
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, 
CompressionType.NONE), null));
     }
 
     @ParameterizedTest
@@ -150,19 +163,19 @@ public class ClientTelemetryUtilsTest {
     private MetricsData getMetricsData() {
         List<Metric> metricsList = new ArrayList<>();
         metricsList.add(SinglePointMetric.sum(
-                        new MetricKey("metricName"), 1.0, true, Instant.now(), 
null, Collections.emptySet())
+                        new MetricKey("metricName"), 1.0, true, Instant.now(), 
null, Set.of())
                 .builder().build());
         metricsList.add(SinglePointMetric.sum(
-                        new MetricKey("metricName1"), 100.0, false, 
Instant.now(),  Instant.now(), Collections.emptySet())
+                        new MetricKey("metricName1"), 100.0, false, 
Instant.now(),  Instant.now(), Set.of())
                 .builder().build());
         metricsList.add(SinglePointMetric.deltaSum(
-                        new MetricKey("metricName2"), 1.0, true, 
Instant.now(), Instant.now(), Collections.emptySet())
+                        new MetricKey("metricName2"), 1.0, true, 
Instant.now(), Instant.now(), Set.of())
                 .builder().build());
         metricsList.add(SinglePointMetric.gauge(
-                        new MetricKey("metricName3"), 1.0, Instant.now(), 
Collections.emptySet())
+                        new MetricKey("metricName3"), 1.0, Instant.now(), 
Set.of())
                 .builder().build());
         metricsList.add(SinglePointMetric.gauge(
-                        new MetricKey("metricName4"), Long.valueOf(100), 
Instant.now(), Collections.emptySet())
+                        new MetricKey("metricName4"), Long.valueOf(100), 
Instant.now(), Set.of())
                 .builder().build());
 
         MetricsData.Builder builder = MetricsData.newBuilder();


Reply via email to