This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0e7323f9b5 KAFKA-15768: Fix getOnlyPartitionResult to handle 
FailedQueryResult (#22123)
e0e7323f9b5 is described below

commit e0e7323f9b5f945012ca53630a742e83cda64b0e
Author: Gavin Wang <[email protected]>
AuthorDate: Tue May 5 12:36:32 2026 -0400

    KAFKA-15768: Fix getOnlyPartitionResult to handle FailedQueryResult (#22123)
    
    `getOnlyPartitionResult()` was silently discarding `FailedQueryResult`
    entries by filtering only for successful results, making failures
    indistinguishable from "not found". Fix the filter to treat failed
    results as meaningful so callers can inspect the failure via
    `isFailure()`/`getFailureReason()`/`getFailureMessage()`.
    
    Reviewers: Evan Zhou <[email protected]>, Bill Bejeck
    <[email protected]>
---
 .../integration/IQv2StoreIntegrationTest.java      | 23 +++++++++++++++++++++-
 .../kafka/streams/query/StateQueryResult.java      |  3 +--
 .../kafka/streams/query/StateQueryResultTest.java  |  9 +++++++++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 42bd76d565d..7af75d07184 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -807,7 +807,7 @@ public class IQv2StoreIntegrationTest {
                                 shouldHandleTimestampedKeyQuery(2, 
ValueAndTimestamp.make(5, -1L));
                             }
                         } else {
-                            assertThrows(AssertionError.class, () -> 
shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.make(5, WINDOW_START + 
Duration.ofMinutes(2).toMillis() * 5)));
+                            shouldHandleFailedTimestampedKeyQuery(2);
                             assertThrows(AssertionError.class, () -> 
shouldHandleTimestampedRangeQueries(false));
                         }
 
@@ -1710,6 +1710,27 @@ public class IQv2StoreIntegrationTest {
         assertThat(queryResult.getPosition(), is(POSITION_0));
     }
 
+    public <V> void shouldHandleFailedTimestampedKeyQuery(final Integer key) {
+        final TimestampedKeyQuery<Integer, V> query = 
TimestampedKeyQuery.withKey(key);
+        final StateQueryRequest<ValueAndTimestamp<V>> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(Set.of(0))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<ValueAndTimestamp<V>> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<ValueAndTimestamp<V>> queryResult =
+                result.getOnlyPartitionResult();
+
+        assertThat(queryResult.isFailure(), is(true));
+        assertThat(queryResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
+        assertThat(queryResult.getFailureMessage(), 
containsString("TimestampedKeyQuery"));
+
+        assertThrows(IllegalArgumentException.class, queryResult::getResult);
+    }
+
     public <V> void shouldHandleRangeQuery(
         final Optional<Integer> lower,
         final Optional<Integer> upper,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 5819710ebab..7ba8ca94af2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -69,8 +69,7 @@ public class StateQueryResult<R> {
             partitionResults
                 .values()
                 .stream()
-                .filter(QueryResult::isSuccess)
-                .filter(r -> r.getResult() != null)
+                .filter(r -> r.isFailure() || r.getResult() != null)
                 .collect(Collectors.toList());
 
         if (nonempty.size() > 1) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
index f263e43bfa3..697f6fca28c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.query;
 
+import org.apache.kafka.streams.query.internals.FailedQueryResult;
 import org.apache.kafka.streams.query.internals.SucceededQueryResult;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -32,6 +33,7 @@ class StateQueryResultTest {
     StateQueryResult<String> stringStateQueryResult;
     final QueryResult<String> noResultsFound = new 
SucceededQueryResult<>(null);
     final QueryResult<String> validResult = new SucceededQueryResult<>("Foo");
+    final QueryResult<String> invalidResult = new 
FailedQueryResult<>(FailureReason.DOES_NOT_EXIST, "Does not exist");
 
     @BeforeEach
     public void setUp() {
@@ -52,6 +54,13 @@ class StateQueryResultTest {
         assertThat("Valid query results still works", result.getResult(), 
is("Foo"));
     }
 
+    @Test
+    void getOnlyPartitionResultWithSingleFailureResultTest() {
+        stringStateQueryResult.addResult(0, invalidResult);
+        final QueryResult<String> result = 
stringStateQueryResult.getOnlyPartitionResult();
+        assertThat("Invalid query result should be a failure", 
result.isFailure(), is(true));
+    }
+
     @Test
     void getOnlyPartitionResultMultipleResults() {
         stringStateQueryResult.addResult(0, validResult);

Reply via email to