This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-position-api in repository https://gitbox.apache.org/repos/asf/kafka.git
commit bcfd086517ff94cd6e8e84421c9b3716393e622a Author: John Roesler <[email protected]> AuthorDate: Wed Dec 8 16:12:08 2021 -0600 KAFKA-13522: add position tracking and bounding to IQv2 --- .../org/apache/kafka/streams/KafkaStreams.java | 43 ++++++++--- .../org/apache/kafka/streams/query/Position.java | 12 ++- .../kafka/streams/query/StateQueryResult.java | 12 ++- .../state/internals/CachingKeyValueStore.java | 4 +- .../state/internals/InMemoryKeyValueStore.java | 7 +- .../state/internals/InMemorySessionStore.java | 9 ++- .../state/internals/InMemoryWindowStore.java | 4 +- .../streams/state/internals/MemoryLRUCache.java | 7 ++ .../state/internals/MemoryNavigableLRUCache.java | 4 +- .../state/internals/RocksDBSessionStore.java | 9 ++- .../streams/state/internals/RocksDBStore.java | 17 +++-- .../state/internals/RocksDBWindowStore.java | 9 ++- .../streams/state/internals/StoreQueryUtils.java | 48 +++++++++++- .../integration/IQv2StoreIntegrationTest.java | 87 ++++++++++++++++------ .../integration/utils/IntegrationTestUtils.java | 65 ++++++++++++++++ 15 files changed, 274 insertions(+), 63 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 0169dd9..c095b69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1764,19 +1764,33 @@ public class KafkaStreams implements AutoCloseable { ); } final StateQueryResult<R> result = new StateQueryResult<>(); + final Set<Integer> handledPartitions = new HashSet<>(); final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); if (globalStateStores.containsKey(storeName)) { - final StateStore store = globalStateStores.get(storeName); - final QueryResult<R> r = - store.query( - request.getQuery(), - request.getPositionBound(), - request.executionInfoEnabled() - ); - result.setGlobalResult(r); + // Global stores pose one significant problem + // for IQv2: when they start up, they skip the regular + // ingest pipeline and instead use the "restoration" pipeline + // to read up until the current end offset. Then, they switch + // over to the regular ingest pipeline. + // + // IQv2 position tracking expects to track the position of each + // record from the input topic through the ingest pipeline and then + // get the position headers through the restoration pipeline via the + // changelog topic. The fact that global stores "restore" the input topic + // instead of ingesting it violates our expectations. + // + // It has also caused other problems, so we may want to consider + // switching the global store processing to use the normal paradigm + // rather than adding special-case logic to track positions in global + // stores. + result.setGlobalResult( + QueryResult.forFailure( + FailureReason.UNKNOWN_QUERY_TYPE, + "Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead." + ) + ); } else { - final Set<Integer> handledPartitions = new HashSet<>(); for (final StreamThread thread : threads) { final Map<TaskId, Task> tasks = thread.allTasks(); @@ -1828,6 +1842,17 @@ public class KafkaStreams implements AutoCloseable { } } + if (!request.isAllPartitions()) { + for (final Integer partition : request.getPartitions()) { + if (!result.getPartitionResults().containsKey(partition)){ + result.addResult(partition, QueryResult.forFailure( + FailureReason.NOT_PRESENT, + "The requested partition was not present at the time of the query." + )); + } + } + } + return result; } diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java index 951866c..88f4b49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java @@ -93,8 +93,7 @@ public class Position { } /** - * Create a new, structurally independent Position that is the result of merging two other - * Positions. + * Merges the provided Position into the current instance. * <p> * If both Positions contain the same topic -> partition -> offset mapping, the resulting * Position will contain a mapping with the larger of the two offsets. @@ -103,12 +102,10 @@ public class Position { if (other == null) { return this; } else { - final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy = - deepCopy(position); for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) { final String topic = entry.getKey(); final Map<Integer, Long> partitionMap = - copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); + position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) { final Integer partition = partitionOffset.getKey(); final Long offset = partitionOffset.getValue(); @@ -118,7 +115,7 @@ public class Position { } } } - return new Position(copy); + return this; } } @@ -133,7 +130,8 @@ public class Position { * Return the partition -> offset mapping for a specific topic. */ public Map<Integer, Long> getBound(final String topic) { - return Collections.unmodifiableMap(position.get(topic)); + final ConcurrentHashMap<Integer, Long> bound = position.get(topic); + return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound); } private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy( diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java index 8b93bd6..c5eaa10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java @@ -97,11 +97,15 @@ public class StateQueryResult<R> { * prior observations. */ public Position getPosition() { - Position position = Position.emptyPosition(); - for (final QueryResult<R> r : partitionResults.values()) { - position = position.merge(r.getPosition()); + if (globalResult != null) { + return globalResult.getPosition(); + } else { + final Position position = Position.emptyPosition(); + for (final QueryResult<R> r : partitionResults.values()) { + position.merge(r.getPosition()); + } + return position; } - return position; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index df22e8c..66683ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -107,10 +107,10 @@ public class CachingKeyValueStore // we can skip flushing to downstream as well as writing to underlying store if (rawNewValue != null || rawOldValue != null) { // we need to get the old values if needed, and then put to store, and then flush - wrapped().put(entry.key(), entry.newValue()); - final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); + wrapped().put(entry.key(), entry.newValue()); + try { flushListener.apply( new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 41977cf..5f47c3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { private final String name; private final NavigableMap<Bytes, byte[]> map = new TreeMap<>(); + private final Position position = Position.emptyPosition(); private volatile boolean open = false; private StateStoreContext context; - private Position position; public InMemoryKeyValueStore(final String name) { this.name = name; - this.position = Position.emptyPosition(); } @Override @@ -102,7 +101,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { query, positionBound, collectExecutionInfo, - this + this, + position, + context.taskId().partition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index d09ab0f..cf351e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -317,7 +317,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { @Override public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, final boolean collectExecutionInfo) { - return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this, + position, + context.taskId().partition() + ); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 46f4973..1b49e9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -357,7 +357,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { query, positionBound, collectExecutionInfo, - this + this, + position, + context.taskId().partition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 22f1215..c81b527 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -35,6 +36,9 @@ import java.util.Objects; */ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { + protected StateStoreContext context; + protected Position position = Position.emptyPosition(); + public interface EldestEntryRemovalListener { void apply(Bytes key, byte[] value); } @@ -95,6 +99,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { put(Bytes.wrap(key), value); restoring = false; }); + this.context = context; } @Override @@ -122,6 +127,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { } else { this.map.put(key, value); } + StoreQueryUtils.updatePosition(position, context); } @Override @@ -144,6 +150,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { @Override public synchronized byte[] delete(final Bytes key) { Objects.requireNonNull(key); + StoreQueryUtils.updatePosition(position, context); return this.map.remove(key); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index eda880e..dd2e8a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -121,7 +121,9 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache { query, positionBound, collectExecutionInfo, - this + this, + position, + context.taskId().partition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index c171792..e6e2740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -53,7 +53,14 @@ public class RocksDBSessionStore @Override public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, final boolean collectExecutionInfo) { - return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this, + position, + stateStoreContext.taskId().partition() + ); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0ab9be9..4f07a15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -326,13 +326,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS } @Override - public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, - final boolean collectExecutionInfo) { + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries( - query, - positionBound, - collectExecutionInfo, - this + query, + positionBound, + collectExecutionInfo, + this, + position, + context.taskId().partition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index efcde31..40415c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -131,7 +131,14 @@ public class RocksDBWindowStore @Override public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, final boolean collectExecutionInfo) { - return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this, + position, + stateStoreContext.taskId().partition() + ); } private void maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index 6217877..006981e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import java.util.Map; +import java.util.Map.Entry; + public final class StoreQueryUtils { // make this class uninstantiable @@ -35,13 +38,19 @@ public final class StoreQueryUtils { final Query<R> query, final PositionBound positionBound, final boolean collectExecutionInfo, - final StateStore store) { + final StateStore store, + final Position position, + final int partition + ) { final QueryResult<R> result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; - // TODO: position tracking if (query instanceof PingQuery) { - result = (QueryResult<R>) QueryResult.forResult(true); + if (!isPermitted(position, positionBound, partition)) { + result = QueryResult.notUpToBound(position, positionBound, partition); + } else { + result = (QueryResult<R>) QueryResult.forResult(true); + } } else { result = QueryResult.forUnknownQueryType(query, store); } @@ -50,6 +59,7 @@ public final class StoreQueryUtils { "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" ); } + result.setPosition(position); return result; } @@ -62,4 +72,36 @@ public final class StoreQueryUtils { position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + + public static boolean isPermitted( + final Position position, + final PositionBound positionBound, + final int partition + ) { + if (positionBound.isUnbounded()) { + return true; + } else { + final Position bound = positionBound.position(); + for (final String topic : bound.getTopics()) { + final Map<Integer, Long> partitionBounds = bound.getBound(topic); + final Map<Integer, Long> seenPartitionBounds = position.getBound(topic); + if (!partitionBounds.containsKey(partition)) { + // this topic isn't bounded for our partition, so just skip over it. + } else { + if (seenPartitionBounds == null) { + // we haven't seen a topic that is bounded for our partition + return false; + } else if (!seenPartitionBounds.containsKey(partition)) { + // we haven't seen a partition that we have a bound for + return false; + } else if (seenPartitionBounds.get(partition) < partitionBounds.get( + partition)) { + // our current position is behind the bound + return false; + } + } + } + return true; + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 3a6adaf..62ff17d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.StateQueryRequest; @@ -85,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertThrows; @Category({IntegrationTest.class}) @@ -248,6 +248,7 @@ public class IQv2StoreIntegrationTest { public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException { CLUSTER.start(); + CLUSTER.deleteAllTopicsAndWait(60 * 1000L); final int partitions = 2; CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1); @@ -295,8 +296,14 @@ public class IQv2StoreIntegrationTest { @Before public void beforeTest() { - final StreamsBuilder builder = new StreamsBuilder(); final StoreSupplier<?> supplier = storeToTest.supplier(); + final Properties streamsConfig = streamsConfiguration( + cache, + log, + storeToTest.name() + ); + + final StreamsBuilder builder = new StreamsBuilder(); if (supplier instanceof KeyValueBytesStoreSupplier) { final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as((KeyValueBytesStoreSupplier) supplier); @@ -389,13 +396,10 @@ public class IQv2StoreIntegrationTest { // Don't need to wait for running, since tests can use iqv2 to wait until they // get a valid response. + kafkaStreams = IntegrationTestUtils.getStartedStreams( - streamsConfiguration( - cache, - log, - supplier.getClass().getSimpleName() - ), + streamsConfig, builder, true ); @@ -414,10 +418,28 @@ public class IQv2StoreIntegrationTest { @Test public void verifyStore() { - shouldRejectUnknownQuery(); - shouldHandlePingQuery(); - shouldCollectExecutionInfo(); - shouldCollectExecutionInfoUnderFailure(); + if (storeToTest.global()) { + globalShouldRejectAllQueries(); + } else { + shouldRejectUnknownQuery(); + shouldHandlePingQuery(); + shouldCollectExecutionInfo(); + shouldCollectExecutionInfoUnderFailure(); + } + } + + private void globalShouldRejectAllQueries() { + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query); + + final StateQueryResult<Boolean> result = kafkaStreams.query(request); + + assertThat(result.getGlobalResult().isFailure(), is(true)); + assertThat(result.getGlobalResult().getFailureReason(), + is(FailureReason.UNKNOWN_QUERY_TYPE)); + assertThat(result.getGlobalResult().getFailureMessage(), + is("Global stores do not yet support the KafkaStreams#query API." + + " Use KafkaStreams#store instead.")); } public void shouldRejectUnknownQuery() { @@ -435,7 +457,6 @@ public class IQv2StoreIntegrationTest { queryResult -> { assertThat(queryResult.isFailure(), is(true)); assertThat(queryResult.isSuccess(), is(false)); - assertThat(queryResult.getPosition(), is(nullValue())); assertThat(queryResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); assertThat(queryResult.getFailureMessage(), @@ -456,12 +477,18 @@ public class IQv2StoreIntegrationTest { public void shouldHandlePingQuery() { final PingQuery query = new PingQuery(); - final StateQueryRequest<Boolean> request = - inStore(STORE_NAME).withQuery(query); final Set<Integer> partitions = mkSet(0, 1); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(partitions) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<Boolean> result = - IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + IntegrationTestUtils.iqv2WaitForResult( + kafkaStreams, + request + ); makeAssertions( partitions, @@ -473,9 +500,6 @@ public class IQv2StoreIntegrationTest { } assertThat(queryResult.isSuccess(), is(true)); - // TODO: position not implemented - assertThat(queryResult.getPosition(), is(nullValue())); - assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); @@ -483,17 +507,25 @@ public class IQv2StoreIntegrationTest { assertThat(queryResult.getExecutionInfo(), is(empty())); }); + assertThat(result.getPosition(), is(INPUT_POSITION)); } public void shouldCollectExecutionInfo() { final PingQuery query = new PingQuery(); - final StateQueryRequest<Boolean> request = - inStore(STORE_NAME).withQuery(query).enableExecutionInfo(); final Set<Integer> partitions = mkSet(0, 1); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME) + .withQuery(query) + .enableExecutionInfo() + .withPartitions(partitions) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<Boolean> result = - IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + IntegrationTestUtils.iqv2WaitForResult( + kafkaStreams, + request + ); makeAssertions( partitions, @@ -505,12 +537,19 @@ public class IQv2StoreIntegrationTest { public void shouldCollectExecutionInfoUnderFailure() { final UnknownQuery query = new UnknownQuery(); - final StateQueryRequest<Void> request = - inStore(STORE_NAME).withQuery(query).enableExecutionInfo(); final Set<Integer> partitions = mkSet(0, 1); + final StateQueryRequest<Void> request = + inStore(STORE_NAME) + .withQuery(query) + .enableExecutionInfo() + .withPartitions(partitions) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<Void> result = - IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + IntegrationTestUtils.iqv2WaitForResult( + kafkaStreams, + request + ); makeAssertions( partitions, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 6cfc5d4..5f527c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -51,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.state.QueryableStoreType; @@ -91,6 +93,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import static org.apache.kafka.common.utils.Utils.sleep; import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; @@ -141,6 +144,68 @@ public class IntegrationTestUtils { throw new TimeoutException("The query never returned the desired partitions"); } + /** + * Repeatedly runs the query until the response is valid and then return the response. + * <p> + * Validity in this case means that the response position is up to the specified bound. + * <p> + * Once position bounding is generally supported, we should migrate tests to wait on the + * expected response position. + */ + public static <R> StateQueryResult<R> iqv2WaitForResult( + final KafkaStreams kafkaStreams, + final StateQueryRequest<R> request) { + + final long start = System.currentTimeMillis(); + final long deadline = start + DEFAULT_TIMEOUT; + + StateQueryResult<R> result; + do { + if (Thread.currentThread().isInterrupted()) { + fail("Test was interrupted."); + } + + result = kafkaStreams.query(request); + final LinkedList<QueryResult<R>> allResults = getAllResults(result); + + if (allResults.isEmpty()) { + sleep(100L); + } else { + final boolean needToWait = allResults + .stream() + .anyMatch(IntegrationTestUtils::needToWait); + if (needToWait) { + sleep(100L); + } else { + return result; + } + } + } while (System.currentTimeMillis() < deadline); + + throw new TimeoutException( + "The query never returned within the bound. Last result: " + + result + ); + } + + private static <R> LinkedList<QueryResult<R>> getAllResults( + final StateQueryResult<R> result) { + final LinkedList<QueryResult<R>> allResults = + new LinkedList<>(result.getPartitionResults().values()); + if (result.getGlobalResult() != null) { + allResults.add(result.getGlobalResult()); + } + return allResults; + } + + private static <R> boolean needToWait(final QueryResult<R> queryResult) { + return queryResult.isFailure() + && ( + FailureReason.NOT_UP_TO_BOUND.equals(queryResult.getFailureReason()) + || FailureReason.NOT_PRESENT.equals(queryResult.getFailureReason()) + ); + } + /* * Records state transition for StreamThread */
