This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-framework in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4fe22d4553889170600438f809f816b9c3d40a86 Author: John Roesler <[email protected]> AuthorDate: Thu Oct 14 12:53:11 2021 -0500 IQv2 Framework --- .../org/apache/kafka/streams/KafkaStreams.java | 111 +++- .../streams/errors/StreamsStoppedException.java | 38 ++ .../apache/kafka/streams/processor/StateStore.java | 33 ++ .../kafka/streams/processor/StateStoreContext.java | 4 +- .../apache/kafka/streams/query/FailureReason.java | 47 ++ .../org/apache/kafka/streams/query/Position.java | 172 +++++++ .../apache/kafka/streams/query/PositionBound.java | 109 ++++ .../java/org/apache/kafka/streams/query/Query.java | 35 ++ .../apache/kafka/streams/query/QueryResult.java | 210 ++++++++ .../kafka/streams/query/StateQueryRequest.java | 225 +++++++++ .../kafka/streams/query/StateQueryResult.java | 114 +++++ .../apache/kafka/streams/state/StateSerdes.java | 9 + .../org/apache/kafka/streams/state/Stores.java | 19 +- .../InMemoryKeyValueBytesStoreSupplier.java | 45 ++ .../state/internals/InMemoryKeyValueStore.java | 17 + .../state/internals/InMemorySessionStore.java | 9 + .../state/internals/InMemoryWindowStore.java | 14 + ...ValueToTimestampedKeyValueByteStoreAdapter.java | 21 + .../state/internals/MemoryNavigableLRUCache.java | 17 + .../kafka/streams/state/internals/PingQuery.java | 29 ++ .../state/internals/RocksDBSessionStore.java | 9 + .../streams/state/internals/RocksDBStore.java | 14 + .../state/internals/RocksDBWindowStore.java | 9 + .../streams/state/internals/StoreQueryUtils.java | 52 ++ .../internals/TimestampedKeyValueStoreBuilder.java | 17 + .../internals/TimestampedWindowStoreBuilder.java | 17 + .../WindowToTimestampedWindowByteStoreAdapter.java | 20 + .../streams/state/internals/WrappedStateStore.java | 19 + .../streams/integration/IQv2IntegrationTest.java | 433 ++++++++++++++++ .../integration/IQv2StoreIntegrationTest.java | 559 +++++++++++++++++++++ .../integration/utils/IntegrationTestUtils.java | 37 ++ .../kafka/streams/query/PositionBoundTest.java | 96 ++++ .../apache/kafka/streams/query/PositionTest.java | 214 ++++++++ .../streams/state/internals/RocksDBStoreTest.java | 2 +- 34 files changed, 2753 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bb0c40a..ee0e503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -40,27 +41,34 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; +import org.apache.kafka.streams.errors.StreamsStoppedException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.UnknownStateStoreException; -import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; +import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; @@ -78,17 +86,18 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -1716,4 +1725,100 @@ public class KafkaStreams implements AutoCloseable { return Collections.unmodifiableMap(localStorePartitionLags); } + + /** + * Run an interactive query against a state store. + * <p> + * This method allows callers outside of the Streams runtime to access the internal state of + * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html + * for more information. + * + * @param <R> The result type specified by the query. + * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link + * KafkaStreams#start()} and then retry this call. + * @throws StreamsStoppedException If Streams is in a terminal state like PENDING_SHUTDOWN, + * NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should + * discover a new instance to query. + * @throws UnknownStateStoreException If the specified store name does not exist in the + * topology. + */ + @Evolving + public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) { + final String storeName = request.getStoreName(); + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + + storeName + + " because no such store is registered in the topology." + ); + } + if (state().hasNotStarted()) { + throw new StreamsNotStartedException( + "KafkaStreams has not been started, you can retry after calling start()." + ); + } + if (state().isShuttingDown() || state.hasCompletedShutdown()) { + throw new StreamsStoppedException( + "KafkaStreams has been stopped (" + state + ")." + + " This instance can no longer serve queries." + ); + } + final StateQueryResult<R> result = new StateQueryResult<>(); + + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + final QueryResult<R> r = + store.query( + request.getQuery(), + request.getPositionBound(), + request.executionInfoEnabled() + ); + result.setGlobalResult(r); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + + final TaskId taskId = entry.getKey(); + final int partition = taskId.partition(); + if (request.isAllPartitions() + || request.getPartitions().contains(partition)) { + final Task task = entry.getValue(); + final StateStore store = task.getStore(storeName); + if (store != null) { + final StreamThread.State state = thread.state(); + final boolean active = task.isActive(); + if (request.isRequireActive() + && (state != StreamThread.State.RUNNING + || !active)) { + result.addResult( + partition, + QueryResult.forFailure( + FailureReason.NOT_UP_TO_BOUND, + "Query requires a running active task," + + " but partition was in state " + + state + " and was " + + (active ? "active" : "not active") + "." + ) + ); + } else { + final QueryResult<R> r = store.query( + request.getQuery(), + request.isRequireActive() + ? PositionBound.unbounded() + : request.getPositionBound(), + request.executionInfoEnabled() + ); + result.addResult(partition, r); + } + } + } + } + } + } + + return result; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java new file mode 100644 index 0000000..c05708b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.streams.KafkaStreams.State; + +/** + * Indicates that Kafka Streams is in a terminating or terminal state, such as {@link + * State#PENDING_SHUTDOWN},{@link State#PENDING_ERROR},{@link State#NOT_RUNNING}, or {@link + * State#ERROR}. This Streams instance will need to be discarded and replaced before it can + * serve queries. The caller may wish to query a different instance. + */ +public class StreamsStoppedException extends InvalidStateStoreException { + + private static final long serialVersionUID = 1L; + + public StreamsStoppedException(final String message) { + super(message); + } + + public StreamsStoppedException(final String message, final Throwable throwable) { + super(message, throwable); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 76d1ab4..2f96020 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,9 +16,14 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; /** * A storage engine for managing state maintained by a stream processor. @@ -119,4 +124,32 @@ public interface StateStore { * @return {@code true} if the store is open */ boolean isOpen(); + + /** + * Execute a query. Returns a QueryResult containing either result data or + * a failure. + * <p> + * If the store doesn't know how to handle the given query, the result + * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. + * If the store couldn't satisfy the given position bound, the result + * shall be a {@link FailureReason#NOT_UP_TO_BOUND}. + * <p> + * Note to store implementers: if your store does not support position tracking, + * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is + * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message + * that bounded positions are not supported. + * <p> + * @param query The query to execute + * @param positionBound The position the store must be at or past + * @param collectExecutionInfo Whether the store should collect detailed execution info for the query + * @param <R> The result type + */ + @Evolving + default <R> QueryResult<R> query( + Query<R> query, + PositionBound positionBound, + boolean collectExecutionInfo) { + // If a store doesn't implement a query handler, then all queries are unknown. + return QueryResult.forUnknownQueryType(query, this); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java index 50d5879..f6f1446 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java @@ -46,8 +46,8 @@ public interface StateStoreContext { /** * Return the metadata of the current topic/partition/offset if available. - * This is defined as the metadata of the record that is currently been - * processed by the StreamTask that holds the store. + * This is defined as the metadata of the record that is currently being + * processed (or was last processed) by the StreamTask that holds the store. * <p> * Note that the metadata is not defined during all store interactions, for * example, while the StreamTask is running a punctuation. diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java new file mode 100644 index 0000000..97e342d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +@Evolving +public enum FailureReason { + /** + * Failure indicating that the store doesn't know how to handle the given query. + */ + UNKNOWN_QUERY_TYPE, + + /** + * Failure indicating that the store partition is not (yet) up to the desired bound. + * The caller should either try again later or try a different replica. + */ + NOT_UP_TO_BOUND, + + /** + * Failure indicating that the requested store partition is not present on the local + * KafkaStreams instance. It may have been migrated to another instance during a rebalance. + * The caller is recommended to try a different replica. + */ + NOT_PRESENT, + + /** + * The requested store partition does not exist at all. For example, partition 4 was requested, + * but the store in question only has 4 partitions (0 through 3). + */ + DOES_NOT_EXIST; +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java new file mode 100644 index 0000000..48e524f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A representation of a position vector with respect to a set of topic partitions. For example, in + * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a + * query result may contain information from multiple store partitions, each of which contains + * information from multiple input topics' partitions. This class can be used to summarize all of + * that positional information. + * <p> + * This class is threadsafe, although it is mutable. Readers are recommended to use {@link + * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For + * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query, + * PositionBound, boolean)} request and returns its current position via {@link + * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable + * reference. + */ +@Evolving +public class Position { + + private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position; + + private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) { + this.position = position; + } + + /** + * Create a new, empty Position. + */ + public static Position emptyPosition() { + return new Position(new ConcurrentHashMap<>()); + } + + /** + * Create a new Position and populate it with a mapping of topic -> partition -> offset. + * <p> + * Note, the resulting Position does not share any structure with the provided map, so + * subsequent changes to the map or Position will not affect the other. + */ + public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) { + return new Position(deepCopy(map)); + } + + /** + * Augment an existing Position by setting a new offset for a topic and partition. + * <p> + * Returns a self-reference for chained calls. Note: this method mutates the Position. Note + * also: this method does not enforce anything about the arguments, except that the topic must + * not be null. It is the caller's responsibility to enforce desired semantics and validity. + */ + public Position withComponent(final String topic, final int partition, final long offset) { + position + .computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) + .put(partition, offset); + return this; + } + + /** + * Create a deep copy of the Position. + */ + public Position copy() { + return new Position(deepCopy(position)); + } + + /** + * Create a new, structurally independent Position that is the result of merging two other + * Positions. + * <p> + * If both Positions contain the same topic -> partition -> offset mapping, the resulting + * Position will contain a mapping with the larger of the two offsets. + */ + public Position merge(final Position other) { + if (other == null) { + return this; + } else { + final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy = + deepCopy(position); + for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) { + final String topic = entry.getKey(); + final Map<Integer, Long> partitionMap = + copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); + for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) { + final Integer partition = partitionOffset.getKey(); + final Long offset = partitionOffset.getValue(); + if (!partitionMap.containsKey(partition) + || partitionMap.get(partition) < offset) { + partitionMap.put(partition, offset); + } + } + } + return new Position(copy); + } + } + + /** + * Return the topics that are represented in this Position. + */ + public Set<String> getTopics() { + return Collections.unmodifiableSet(position.keySet()); + } + + /** + * Return the partition -> offset mapping for a specific topic. + */ + public Map<Integer, Long> getBound(final String topic) { + return Collections.unmodifiableMap(position.get(topic)); + } + + private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy( + final Map<String, ? extends Map<Integer, Long>> map) { + if (map == null) { + return new ConcurrentHashMap<>(); + } else { + final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy = + new ConcurrentHashMap<>(map.size()); + for (final Entry<String, ? extends Map<Integer, Long>> entry : map.entrySet()) { + copy.put(entry.getKey(), new ConcurrentHashMap<>(entry.getValue())); + } + return copy; + } + } + + @Override + public String toString() { + return "Position{" + + "position=" + position + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Position position1 = (Position) o; + return Objects.equals(position, position1.position); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException( + "This mutable object is not suitable as a hash key"); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java new file mode 100644 index 0000000..46d9376 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.Objects; + +/** + * A class bounding the processing state {@link Position} during queries. This can be used to + * specify that a query should fail if the locally available partition isn't caught up to the + * specified bound. "Unbounded" places no restrictions on the current location of the partition. + */ +@Evolving +public class PositionBound { + + private final Position position; + private final boolean unbounded; + + private PositionBound(final Position position, final boolean unbounded) { + if (unbounded && position != null) { + throw new IllegalArgumentException(); + } else if (position != null) { + this.position = position.copy(); + this.unbounded = false; + } else { + this.position = null; + this.unbounded = unbounded; + } + } + + /** + * Creates a new PositionBound representing "no bound" + */ + public static PositionBound unbounded() { + return new PositionBound(null, true); + } + + /** + * Creates a new PositionBound representing a specific position. + */ + public static PositionBound at(final Position position) { + return new PositionBound(position, false); + } + + /** + * Returns true iff this object specifies that there is no position bound. + */ + public boolean isUnbounded() { + return unbounded; + } + + /** + * Returns the specific position of this bound. + * + * @throws IllegalArgumentException if this is an "unbounded" position. + */ + public Position position() { + if (unbounded) { + throw new IllegalArgumentException( + "Cannot get the position of an unbounded PositionBound." + ); + } else { + return position; + } + } + + @Override + public String toString() { + if (isUnbounded()) { + return "PositionBound{unbounded}"; + } else { + return "PositionBound{position=" + position + '}'; + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PositionBound that = (PositionBound) o; + return unbounded == that.unbounded && Objects.equals(position, that.position); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException( + "This mutable object is not suitable as a hash key"); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Query.java b/streams/src/main/java/org/apache/kafka/streams/query/Query.java new file mode 100644 index 0000000..ad77a91 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +/** + * Marker interface that all interactive queries must implement (see {@link + * org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}). + * <p> + * You can find all available queries by searching for classes implementing this interface. + * <p> + * Kafka Streams will pass unknown query types straight through into the bytes stores, so callers + * can add custom queries by implementing this interface and providing custom stores that handle + * them (via {@link org.apache.kafka.streams.state.StoreSupplier}s. + * <p> + * See KIP-796 (https://cwiki.apache.org/confluence/x/34xnCw) for more details. + * + * @param <R> The type of the result returned by this query. + */ +public interface Query<R> { + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java new file mode 100644 index 0000000..780ea86 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import org.apache.kafka.streams.processor.StateStore; + +import java.util.LinkedList; +import java.util.List; + +/** + * Container for a single partition's result when executing a {@link StateQueryRequest}. + * + * @param <R> The result type of the query. + */ +public final class QueryResult<R> { + + private final List<String> executionInfo = new LinkedList<>(); + private final FailureReason failureReason; + private final String failure; + private final R result; + private Position position; + + private QueryResult(final R result) { + this.result = result; + this.failureReason = null; + this.failure = null; + } + + private QueryResult(final FailureReason failureReason, final String failure) { + this.result = null; + this.failureReason = failureReason; + this.failure = failure; + } + + /** + * Static factory method to create a result object for a successful query. Used by StateStores + * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}. + */ + public static <R> QueryResult<R> forResult(final R result) { + return new QueryResult<>(result); + } + + /** + * Static factory method to create a result object for a failed query. Used by StateStores to + * respond to a {@link StateStore#query(Query, PositionBound, boolean)}. + */ + public static <R> QueryResult<R> forFailure( + final FailureReason failureReason, + final String failureMessage) { + + return new QueryResult<>(failureReason, failureMessage); + } + + /** + * Static factory method to create a failed query result object to indicate that the store does + * not know how to handle the query. + * <p> + * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}. + */ + public static <R> QueryResult<R> forUnknownQueryType( + final Query<R> query, + final StateStore store) { + + return new QueryResult<>( + FailureReason.UNKNOWN_QUERY_TYPE, + "This store (" + store.getClass() + ") doesn't know how to execute " + + "the given query (" + query + ")." + + " Contact the store maintainer if you need support for a new query type."); + } + + /** + * Static factory method to create a failed query result object to indicate that the store has + * not yet caught up to the requested position bound. + * <p> + * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}. + */ + public static <R> QueryResult<R> notUpToBound( + final Position currentPosition, + final PositionBound positionBound, + final int partition) { + + return new QueryResult<>( + FailureReason.NOT_UP_TO_BOUND, + "For store partition " + partition + ", the current position " + + currentPosition + " is not yet up to the bound " + + positionBound + ); + } + + /** + * Used by stores to add detailed execution information (if requested) during query execution. + */ + public void addExecutionInfo(final String message) { + executionInfo.add(message); + } + + /** + * Used by stores to report what exact position in the store's history it was at when it + * executed the query. + */ + public void setPosition(final Position position) { + this.position = position; + } + + /** + * True iff the query was successfully executed. The response is available in {@link + * this#getResult()}. + */ + public boolean isSuccess() { + return failureReason == null; + } + + + /** + * True iff the query execution failed. More information about the failure is available in + * {@link this#getFailureReason()} and {@link this#getFailureMessage()}. + */ + public boolean isFailure() { + return failureReason != null; + } + + /** + * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()}, + * this method returned the execution details for this partition's result. + */ + public List<String> getExecutionInfo() { + return executionInfo; + } + + /** + * This state partition's exact position in its history when this query was executed. Can be + * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}. + * <p> + * Note: stores are encouraged, but not required to set this property. + */ + public Position getPosition() { + return position; + } + + /** + * If this partition failed to execute the query, returns the reason. + * + * @throws IllegalArgumentException if this is not a failed result. + */ + public FailureReason getFailureReason() { + if (!isFailure()) { + throw new IllegalArgumentException( + "Cannot get failure reason because this query did not fail." + ); + } + return failureReason; + } + + /** + * If this partition failed to execute the query, returns the failure message. + * + * @throws IllegalArgumentException if this is not a failed result. + */ + public String getFailureMessage() { + if (!isFailure()) { + throw new IllegalArgumentException( + "Cannot get failure message because this query did not fail." + ); + } + return failure; + } + + /** + * Returns the result of executing the query on one partition. The result type is determined by + * the query. Note: queries may choose to return {@code null} for a successful query, so {@link + * this#isSuccess()} and {@link this#isFailure()} must be used to determine whether the query + * was successful of failed on this partition. + * + * @throws IllegalArgumentException if this is not a successful query. + */ + public R getResult() { + if (!isSuccess()) { + throw new IllegalArgumentException( + "Cannot get result for failed query. Failure is " + failureReason.name() + ": " + + failure); + } + return result; + } + + @Override + public String toString() { + return "QueryResult{" + + "executionInfo=" + executionInfo + + ", failureReason=" + failureReason + + ", failure='" + failure + '\'' + + ", result=" + result + + ", position=" + position + + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java new file mode 100644 index 0000000..eb99b79 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +/** + * The request object for Interactive Queries. This is an immutable builder class for passing all + * required and optional arguments for querying a state store in Kafka Streams. + * <p> + * + * @param <R> The type of the query result. + */ +@Evolving +public class StateQueryRequest<R> { + + private final String storeName; + private final PositionBound position; + private final Optional<Set<Integer>> partitions; + private final Query<R> query; + private final boolean executionInfoEnabled; + private final boolean requireActive; + + private StateQueryRequest( + final String storeName, + final PositionBound position, + final Optional<Set<Integer>> partitions, + final Query<R> query, + final boolean executionInfoEnabled, + final boolean requireActive) { + + this.storeName = storeName; + this.position = position; + this.partitions = partitions; + this.query = query; + this.executionInfoEnabled = executionInfoEnabled; + this.requireActive = requireActive; + } + + /** + * Specifies the name of the store to query. + */ + public static InStore inStore(final String name) { + return new InStore(name); + } + + /** + * Bounds the position of the state store against its input topics. + */ + public StateQueryRequest<R> withPositionBound(final PositionBound positionBound) { + return new StateQueryRequest<>( + storeName, + positionBound, + partitions, + query, + executionInfoEnabled, + requireActive + ); + } + + + /** + * Specifies that the query will run against all locally available partitions. + */ + public StateQueryRequest<R> withAllPartitions() { + return new StateQueryRequest<>( + storeName, + position, + Optional.empty(), + query, + executionInfoEnabled, + requireActive + ); + } + + /** + * Specifies a set of partitions to run against. If some partitions are not locally available, + * the response will contain a {@link FailureReason#NOT_PRESENT} for those partitions. If some + * partitions in this set are not valid partitions for the store, the response will contain a + * {@link FailureReason#DOES_NOT_EXIST} for those partitions. + */ + public StateQueryRequest<R> withPartitions(final Set<Integer> partitions) { + return new StateQueryRequest<>( + storeName, + position, + Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))), + query, + executionInfoEnabled, + requireActive + ); + } + + /** + * Requests for stores and the Streams runtime to record any useful details about how the query + * was executed. + */ + public StateQueryRequest<R> enableExecutionInfo() { + return new StateQueryRequest<>( + storeName, + position, + partitions, + query, + true, + requireActive + ); + } + + /** + * Specifies that this query should only run on partitions for which this instance is the leader + * (aka "active"). Partitions for which this instance is not the active replica will return + * {@link FailureReason#NOT_UP_TO_BOUND}. + */ + public StateQueryRequest<R> requireActive() { + return new StateQueryRequest<>( + storeName, + position, + partitions, + query, + executionInfoEnabled, + true + ); + } + + /** + * The name of the store this request is for. + */ + public String getStoreName() { + return storeName; + } + + /** + * The bound that this request places on its query, in terms of the partitions' positions + * against its inputs. + */ + public PositionBound getPositionBound() { + return position; + } + + /** + * The query this request is meant to run. + */ + public Query<R> getQuery() { + return query; + } + + /** + * Whether this request should fetch from all locally available partitions. + */ + public boolean isAllPartitions() { + return !partitions.isPresent(); + } + + /** + * If the request is for specific partitions, return the set of partitions to query. + * + * @throws IllegalStateException if this is a request for all partitions + */ + public Set<Integer> getPartitions() { + if (!partitions.isPresent()) { + throw new IllegalStateException( + "Cannot list partitions of an 'all partitions' request"); + } else { + return partitions.get(); + } + } + + /** + * Whether the request includes detailed execution information. + */ + public boolean executionInfoEnabled() { + return executionInfoEnabled; + } + + /** + * Whether this request requires the query to execute only on active partitions. + */ + public boolean isRequireActive() { + return requireActive; + } + + /** + * A progressive builder interface for creating {@code StoreQueryRequest}s. + */ + public static class InStore { + + private final String name; + + private InStore(final String name) { + this.name = name; + } + + /** + * Specifies the query to run on the specified store. + */ + public <R> StateQueryRequest<R> withQuery(final Query<R> query) { + return new StateQueryRequest<>( + name, // name is already specified + PositionBound.unbounded(), // default: unbounded + Optional.empty(), // default: all partitions + query, // the query is specified + false, // default: no execution info + false // default: don't require active + ); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java new file mode 100644 index 0000000..8b93bd6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * The response object for interactive queries. This wraps the individual partition results, as well + * as metadata relating to the result as a whole. + * <p> + * + * @param <R> The type of the query result. + */ +@Evolving +public class StateQueryResult<R> { + + private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>(); + private QueryResult<R> globalResult = null; + + /** + * Set the result for a global store query. Used by Kafka Streams and available for tests. + */ + public void setGlobalResult(final QueryResult<R> r) { + this.globalResult = r; + } + + /** + * Set the result for a partitioned store query. Used by Kafka Streams and available for tests. + */ + public void addResult(final int partition, final QueryResult<R> r) { + partitionResults.put(partition, r); + } + + + /** + * The query's result for each partition that executed the query. Empty for global store + * queries. + */ + public Map<Integer, QueryResult<R>> getPartitionResults() { + return partitionResults; + } + + /** + * For queries that are expected to match records in only one partition, returns the result. + * + * @throws IllegalArgumentException if the results are not for exactly one partition. + */ + public QueryResult<R> getOnlyPartitionResult() { + final List<QueryResult<R>> nonempty = + partitionResults + .values() + .stream() + .filter(r -> r.getResult() != null) + .collect(Collectors.toList()); + + if (nonempty.size() != 1) { + throw new IllegalArgumentException( + "The query did not return exactly one partition result: " + partitionResults + ); + } else { + return nonempty.get(0); + } + } + + /** + * The query's result for global store queries. Is {@code null} for non-global (partitioned) + * store queries. + */ + public QueryResult<R> getGlobalResult() { + return globalResult; + } + + /** + * The position of the state store at the moment it executed the query. In conjunction with + * {@link StateQueryRequest#withPositionBound}, this can be used to achieve a good balance + * between consistency and availability in which repeated queries are guaranteed to advance in + * time while allowing reads to be served from any replica that is caught up to that caller's + * prior observations. + */ + public Position getPosition() { + Position position = Position.emptyPosition(); + for (final QueryResult<R> r : partitionResults.values()) { + position = position.merge(r.getPosition()); + } + return position; + } + + @Override + public String toString() { + return "StateQueryResult{" + + "partitionResults=" + partitionResults + + ", globalResult=" + globalResult + + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index f9f0bdc..da7927e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -212,4 +212,13 @@ public final class StateSerdes<K, V> { e); } } + + @Override + public String toString() { + return "StateSerdes{" + + "topic='" + topic + '\'' + + ", keySerde=" + keySerde + + ", valueSerde=" + valueSerde + + '}'; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index bf4e5aa..17fbbb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.internals.InMemorySessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; @@ -122,22 +122,7 @@ public final class Stores { */ public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) { Objects.requireNonNull(name, "name cannot be null"); - return new KeyValueBytesStoreSupplier() { - @Override - public String name() { - return name; - } - - @Override - public KeyValueStore<Bytes, byte[]> get() { - return new InMemoryKeyValueStore(name); - } - - @Override - public String metricsScope() { - return "in-memory"; - } - }; + return new InMemoryKeyValueBytesStoreSupplier(name); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java new file mode 100644 index 0000000..ef0f96a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; + +public class InMemoryKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier { + + private final String name; + + public InMemoryKeyValueBytesStoreSupplier(final String name) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public KeyValueStore<Bytes, byte[]> get() { + return new InMemoryKeyValueStore(name); + } + + @Override + public String metricsScope() { + return "in-memory"; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index cd6d29f..7b190c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.slf4j.Logger; @@ -90,6 +93,20 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { } @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this + ); + } + + @Override public synchronized byte[] get(final Bytes key) { return map.get(key); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index bc8cda6..03b4594 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -27,6 +27,9 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.slf4j.Logger; @@ -292,6 +295,12 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { } @Override + public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + } + + @Override public void flush() { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 5327e75..b07ded5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -27,6 +27,9 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -328,6 +331,17 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { } @Override + public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this + ); + } + + @Override public void flush() { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index d9b42c2..e9cdece 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -116,6 +119,24 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt } @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + final QueryResult<R> result = store.query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (end - start) + "ns" + ); + } + return result; + } + + @Override public byte[] get(final Bytes key) { return convertToTimestampedFormat(store.get(key)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index 84f46ad..eda880e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -19,6 +19,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +111,20 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache { return new TreeMap<>(this.map); } + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this + ); + } + private static class CacheIterator implements KeyValueIterator<Bytes, byte[]> { private final Iterator<Bytes> keys; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java new file mode 100644 index 0000000..1eaf128 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.Query; + +/** + * A very simple query that all stores can handle to verify that the store is participating in the + * IQv2 framework properly. + * <p> + * This is not a public API and may change without notice. + */ +public class PingQuery implements Query<Boolean> { + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index f5d7108..a7f7eb3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -31,6 +34,12 @@ public class RocksDBSessionStore } @Override + public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 93dd93b..117b9bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -29,6 +29,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -109,6 +112,17 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS private StateStoreContext context; private Position position; + @Override + public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + collectExecutionInfo, + this + ); + } + RocksDBStore(final String name, final String metricsScope) { this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, name)); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 8f48dca..3e7a43f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -112,4 +115,10 @@ public class RocksDBWindowStore seqnum = (seqnum + 1) & 0x7FFFFFFF; } } + + @Override + public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, + final boolean collectExecutionInfo) { + return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java new file mode 100644 index 0000000..b1d448a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; + +public final class StoreQueryUtils { + + // make this class uninstantiable + private StoreQueryUtils() { + } + + @SuppressWarnings("unchecked") + public static <R> QueryResult<R> handleBasicQueries( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + + final QueryResult<R> result; + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + // TODO: position tracking + if (query instanceof PingQuery) { + result = (QueryResult<R>) QueryResult.forResult(true); + } else { + result = QueryResult.forUnknownQueryType(query, store); + } + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" + ); + } + return result; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index a249a14..f6b3447 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -185,6 +188,20 @@ public class TimestampedKeyValueStoreBuilder<K, V> } @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns"); + } + return result; + } + + @Override public String name() { return wrapped.name(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index b3727f5..840274a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -203,6 +206,20 @@ public class TimestampedWindowStoreBuilder<K, V> } @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns"); + } + return result; + } + + @Override public String name() { return wrapped.name(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index f7999d3..88549ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -183,6 +186,23 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by return store.isOpen(); } + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + final QueryResult<R> result = store.query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (end - start) + "ns" + ); + } + return result; + } + private static class WindowToTimestampedWindowIteratorAdapter extends KeyValueToTimestampedKeyValueIteratorAdapter<Long> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index e8244f7..7531b38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.TimestampedBytesStore; /** @@ -103,6 +106,22 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S wrapped.close(); } + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = collectExecutionInfo ? System.nanoTime() : -1L; + final QueryResult<R> result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo( + "Handled in " + getClass() + " via WrappedStateStore" + " in " + (end - start) + + "ns"); + } + return result; + } + public S wrapped() { return wrapped; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java new file mode 100644 index 0000000..7d33c74 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsNotStartedException; +import org.apache.kafka.streams.errors.StreamsStoppedException; +import org.apache.kafka.streams.errors.UnknownStateStoreException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.apache.kafka.streams.processor.internals.StreamThread.State; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.PingQuery; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.Collections.singleton; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.query.StateQueryRequest.inStore; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.matchesPattern; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Category({IntegrationTest.class}) +public class IQv2IntegrationTest { + + private static final int NUM_BROKERS = 1; + public static final Duration WINDOW_SIZE = Duration.ofMinutes(5); + private static int port = 0; + private static final String INPUT_TOPIC_NAME = "input-topic"; + private static final Position INPUT_POSITION = Position.emptyPosition(); + private static final String STORE_NAME = "kv-store"; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + private KafkaStreams kafkaStreams; + + @BeforeClass + public static void before() + throws InterruptedException, IOException, ExecutionException, TimeoutException { + CLUSTER.start(); + final int partitions = 2; + CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1); + + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + + final List<Future<RecordMetadata>> futures = new LinkedList<>(); + try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { + for (int i = 0; i < 3; i++) { + final Future<RecordMetadata> send = producer.send( + new ProducerRecord<>( + INPUT_TOPIC_NAME, + i % partitions, + Time.SYSTEM.milliseconds(), + i, + i, + null + ) + ); + futures.add(send); + Time.SYSTEM.sleep(1L); + } + producer.flush(); + + for (final Future<RecordMetadata> future : futures) { + final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES); + assertThat(recordMetadata.hasOffset(), is(true)); + INPUT_POSITION.withComponent( + recordMetadata.topic(), + recordMetadata.partition(), + recordMetadata.offset() + ); + } + } + + assertThat(INPUT_POSITION, equalTo( + Position + .emptyPosition() + .withComponent(INPUT_TOPIC_NAME, 0, 1L) + .withComponent(INPUT_TOPIC_NAME, 1, 0L) + )); + } + + @Before + public void beforeTest() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.as(STORE_NAME) + ); + + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration()); + kafkaStreams.cleanUp(); + } + + @After + public void afterTest() { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Test + public void shouldFailUnknownStore() { + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore("unknown-store").withQuery(query); + + assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request)); + } + + @Test + public void shouldFailNotStarted() { + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query); + + assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request)); + } + + @Test + public void shouldFailStopped() { + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query); + + kafkaStreams.start(); + kafkaStreams.close(Duration.ZERO); + assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request)); + } + + @Test + public void shouldRejectNonRunningActive() + throws NoSuchFieldException, IllegalAccessException { + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query).requireActive(); + final Set<Integer> partitions = mkSet(0, 1); + + kafkaStreams.start(); + + final Field threadsField = KafkaStreams.class.getDeclaredField("threads"); + threadsField.setAccessible(true); + @SuppressWarnings("unchecked") final List<StreamThread> threads = + (List<StreamThread>) threadsField.get(kafkaStreams); + final StreamThread streamThread = threads.get(0); + + final Field stateLock = StreamThread.class.getDeclaredField("stateLock"); + stateLock.setAccessible(true); + final Object lock = stateLock.get(streamThread); + + // wait for the desired partitions to be assigned + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal( + kafkaStreams, + inStore(STORE_NAME).withQuery(query), + partitions + ); + + // then lock the thread state, change it, and make our assertions. + synchronized (lock) { + final Field stateField = StreamThread.class.getDeclaredField("state"); + stateField.setAccessible(true); + stateField.set(streamThread, State.PARTITIONS_ASSIGNED); + + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal( + kafkaStreams, + request, + partitions + ); + + assertThat(result.getPartitionResults().keySet(), is(partitions)); + for (final Integer partition : partitions) { + assertThat(result.getPartitionResults().get(partition).isFailure(), is(true)); + assertThat( + result.getPartitionResults().get(partition).getFailureReason(), + is(FailureReason.NOT_UP_TO_BOUND) + ); + assertThat( + result.getPartitionResults().get(partition).getFailureMessage(), + is("Query requires a running active task," + + " but partition was in state PARTITIONS_ASSIGNED and was active.") + ); + } + } + } + + @Test + public void shouldFetchFromPartition() { + final PingQuery query = new PingQuery(); + final int partition = 1; + final Set<Integer> partitions = singleton(partition); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query).withPartitions(partitions); + + kafkaStreams.start(); + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); + } + + @Test + public void shouldFetchExplicitlyFromAllPartitions() { + final PingQuery query = new PingQuery(); + final Set<Integer> partitions = mkSet(0, 1); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query).withAllPartitions(); + + kafkaStreams.start(); + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); + } + + @Test + public void shouldNotRequireQueryHandler() { + final PingQuery query = new PingQuery(); + final int partition = 1; + final Set<Integer> partitions = singleton(partition); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query).withPartitions(partitions); + + final StreamsBuilder builder = new StreamsBuilder(); + + builder.table( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.as(new KeyValueBytesStoreSupplier() { + @Override + public String name() { + return STORE_NAME; + } + + @Override + public KeyValueStore<Bytes, byte[]> get() { + return new KeyValueStore<Bytes, byte[]>() { + private boolean open = false; + private Map<Bytes, byte[]> map = new HashMap<>(); + + @Override + public void put(final Bytes key, final byte[] value) { + map.put(key, value); + } + + @Override + public byte[] putIfAbsent(final Bytes key, final byte[] value) { + return map.putIfAbsent(key, value); + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + for (final KeyValue<Bytes, byte[]> entry : entries) { + map.put(entry.key, entry.value); + } + } + + @Override + public byte[] delete(final Bytes key) { + return map.remove(key); + } + + @Override + public String name() { + return STORE_NAME; + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + context.register(root, (key, value) -> put(Bytes.wrap(key), value)); + this.open = true; + } + + @Override + public void flush() { + + } + + @Override + public void close() { + this.open = false; + map.clear(); + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public byte[] get(final Bytes key) { + return map.get(key); + } + + @Override + public KeyValueIterator<Bytes, byte[]> range( + final Bytes from, + final Bytes to + ) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueIterator<Bytes, byte[]> all() { + throw new UnsupportedOperationException(); + } + + @Override + public long approximateNumEntries() { + return map.size(); + } + }; + } + + @Override + public String metricsScope() { + return "nonquery"; + } + }) + ); + + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration()); + kafkaStreams.cleanUp(); + + kafkaStreams.start(); + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + final QueryResult<Boolean> queryResult = result.getPartitionResults().get(partition); + assertThat(queryResult.isFailure(), is(true)); + assertThat(queryResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); + assertThat(queryResult.getFailureMessage(), matchesPattern( + "This store (.*) doesn't know how to execute the given query (.*)." + + " Contact the store maintainer if you need support for a new query type." + )); + } + + + private static Properties streamsConfiguration() { + final String safeTestName = IQv2IntegrationTest.class.getName(); + final Properties config = new Properties(); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + return config; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java new file mode 100644 index 0000000..0c0ba39 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.internals.PingQuery; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.query.StateQueryRequest.inStore; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.matchesPattern; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Category({IntegrationTest.class}) +@RunWith(value = Parameterized.class) +public class IQv2StoreIntegrationTest { + + private static final int NUM_BROKERS = 1; + public static final Duration WINDOW_SIZE = Duration.ofMinutes(5); + private static int port = 0; + private static final String INPUT_TOPIC_NAME = "input-topic"; + private static final Position INPUT_POSITION = Position.emptyPosition(); + private static final String STORE_NAME = "kv-store"; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final StoresToTest storeToTest; + + public static class UnknownQuery implements Query<Void> { + + } + + private final boolean cache; + private final boolean log; + + private KafkaStreams kafkaStreams; + + public enum StoresToTest { + GLOBAL_IN_MEMORY_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + + @Override + public boolean global() { + return true; + } + }, + GLOBAL_IN_MEMORY_LRU { + @Override + public StoreSupplier<?> supplier() { + return Stores.lruMap(STORE_NAME, 100); + } + + @Override + public boolean global() { + return true; + } + }, + GLOBAL_ROCKS_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentKeyValueStore(STORE_NAME); + } + + @Override + public boolean global() { + return true; + } + }, + GLOBAL_TIME_ROCKS_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentTimestampedKeyValueStore(STORE_NAME); + } + + @Override + public boolean global() { + return true; + } + }, + IN_MEMORY_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + }, + IN_MEMORY_LRU { + @Override + public StoreSupplier<?> supplier() { + return Stores.lruMap(STORE_NAME, 100); + } + }, + ROCKS_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentKeyValueStore(STORE_NAME); + } + }, + TIME_ROCKS_KV { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentTimestampedKeyValueStore(STORE_NAME); + } + }, + IN_MEMORY_WINDOW { + @Override + public StoreSupplier<?> supplier() { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE, + false); + } + }, + ROCKS_WINDOW { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE, + false); + } + }, + TIME_ROCKS_WINDOW { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1), + WINDOW_SIZE, false); + } + }, + IN_MEMORY_SESSION { + @Override + public StoreSupplier<?> supplier() { + return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1)); + } + }, + ROCKS_SESSION { + @Override + public StoreSupplier<?> supplier() { + return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1)); + } + }; + + public abstract StoreSupplier<?> supplier(); + + public boolean global() { + return false; + } + } + + @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}") + public static Collection<Object[]> data() { + final List<Object[]> values = new ArrayList<>(); + for (final boolean cacheEnabled : Arrays.asList(true, false)) { + for (final boolean logEnabled : Arrays.asList(true, false)) { + for (final StoresToTest toTest : StoresToTest.values()) { + values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()}); + } + } + } + return values; + } + + public IQv2StoreIntegrationTest( + final boolean cache, + final boolean log, + final String storeToTest) { + + this.cache = cache; + this.log = log; + this.storeToTest = StoresToTest.valueOf(storeToTest); + } + + @BeforeClass + public static void before() + throws InterruptedException, IOException, ExecutionException, TimeoutException { + CLUSTER.start(); + final int partitions = 2; + CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1); + + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + + final List<Future<RecordMetadata>> futures = new LinkedList<>(); + try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { + for (int i = 0; i < 3; i++) { + final Future<RecordMetadata> send = producer.send( + new ProducerRecord<>( + INPUT_TOPIC_NAME, + i % partitions, + Time.SYSTEM.milliseconds(), + i, + i, + null + ) + ); + futures.add(send); + Time.SYSTEM.sleep(1L); + } + producer.flush(); + + for (final Future<RecordMetadata> future : futures) { + final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES); + assertThat(recordMetadata.hasOffset(), is(true)); + INPUT_POSITION.withComponent( + recordMetadata.topic(), + recordMetadata.partition(), + recordMetadata.offset() + ); + } + } + + assertThat(INPUT_POSITION, equalTo( + Position + .emptyPosition() + .withComponent(INPUT_TOPIC_NAME, 0, 1L) + .withComponent(INPUT_TOPIC_NAME, 1, 0L) + )); + } + + @Before + public void beforeTest() { + final StreamsBuilder builder = new StreamsBuilder(); + final StoreSupplier<?> supplier = storeToTest.supplier(); + if (supplier instanceof KeyValueBytesStoreSupplier) { + final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized = + Materialized.as((KeyValueBytesStoreSupplier) supplier); + + if (cache) { + materialized.withCachingEnabled(); + } else { + materialized.withCachingDisabled(); + } + + if (log) { + materialized.withLoggingEnabled(Collections.emptyMap()); + } else { + materialized.withCachingDisabled(); + } + + if (storeToTest.global()) { + builder + .globalTable( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + materialized + ); + } else { + builder + .table( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + materialized + ); + } + } else if (supplier instanceof WindowBytesStoreSupplier) { + final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized = + Materialized.as((WindowBytesStoreSupplier) supplier); + + if (cache) { + materialized.withCachingEnabled(); + } else { + materialized.withCachingDisabled(); + } + + if (log) { + materialized.withLoggingEnabled(Collections.emptyMap()); + } else { + materialized.withCachingDisabled(); + } + + builder + .stream( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()) + ) + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)) + .aggregate( + () -> 0, + (key, value, aggregate) -> aggregate + value, + materialized + ); + } else if (supplier instanceof SessionBytesStoreSupplier) { + final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized = + Materialized.as((SessionBytesStoreSupplier) supplier); + + if (cache) { + materialized.withCachingEnabled(); + } else { + materialized.withCachingDisabled(); + } + + if (log) { + materialized.withLoggingEnabled(Collections.emptyMap()); + } else { + materialized.withCachingDisabled(); + } + + builder + .stream( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()) + ) + .groupByKey() + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE)) + .aggregate( + () -> 0, + (key, value, aggregate) -> aggregate + value, + (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, + materialized + ); + } else { + throw new AssertionError("Store supplier is an unrecognized type."); + } + + // Don't need to wait for running, since tests can use iqv2 to wait until they + // get a valid response. + kafkaStreams = + IntegrationTestUtils.getStartedStreams( + streamsConfiguration( + cache, + log, + supplier.getClass().getSimpleName() + ), + builder, + true + ); + } + + @After + public void afterTest() { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Test + public void verifyStore() { + shouldRejectUnknownQuery(); + shouldHandlePingQuery(); + shouldCollectExecutionInfo(); + shouldCollectExecutionInfoUnderFailure(); + } + + public void shouldRejectUnknownQuery() { + + final UnknownQuery query = new UnknownQuery(); + final StateQueryRequest<Void> request = inStore(STORE_NAME).withQuery(query); + final Set<Integer> partitions = mkSet(0, 1); + + final StateQueryResult<Void> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + makeAssertions( + partitions, + result, + queryResult -> { + assertThat(queryResult.isFailure(), is(true)); + assertThat(queryResult.isSuccess(), is(false)); + assertThat(queryResult.getPosition(), is(nullValue())); + assertThat(queryResult.getFailureReason(), + is(FailureReason.UNKNOWN_QUERY_TYPE)); + assertThat(queryResult.getFailureMessage(), + matchesPattern( + "This store (.*)" + + " doesn't know how to execute the given query" + + " (.*)." + + " Contact the store maintainer if you need support for a new query type." + ) + ); + assertThrows(IllegalArgumentException.class, queryResult::getResult); + + assertThat(queryResult.getExecutionInfo(), is(empty())); + } + ); + } + + public void shouldHandlePingQuery() { + + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query); + final Set<Integer> partitions = mkSet(0, 1); + + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + makeAssertions( + partitions, + result, + queryResult -> { + final boolean failure = queryResult.isFailure(); + if (failure) { + assertThat(queryResult.toString(), failure, is(false)); + } + assertThat(queryResult.isSuccess(), is(true)); + + // TODO: position not implemented + assertThat(queryResult.getPosition(), is(nullValue())); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + assertThat(queryResult.getResult(), is(true)); + + assertThat(queryResult.getExecutionInfo(), is(empty())); + }); + } + + public void shouldCollectExecutionInfo() { + + final PingQuery query = new PingQuery(); + final StateQueryRequest<Boolean> request = + inStore(STORE_NAME).withQuery(query).enableExecutionInfo(); + final Set<Integer> partitions = mkSet(0, 1); + + final StateQueryResult<Boolean> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + makeAssertions( + partitions, + result, + queryResult -> assertThat(queryResult.getExecutionInfo(), not(empty())) + ); + } + + public void shouldCollectExecutionInfoUnderFailure() { + + final UnknownQuery query = new UnknownQuery(); + final StateQueryRequest<Void> request = + inStore(STORE_NAME).withQuery(query).enableExecutionInfo(); + final Set<Integer> partitions = mkSet(0, 1); + + final StateQueryResult<Void> result = + IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions); + + makeAssertions( + partitions, + result, + queryResult -> assertThat(queryResult.getExecutionInfo(), not(empty())) + ); + } + + private <R> void makeAssertions( + final Set<Integer> partitions, + final StateQueryResult<R> result, + final Consumer<QueryResult<R>> assertion) { + + if (result.getGlobalResult() != null) { + assertion.accept(result.getGlobalResult()); + } else { + assertThat(result.getPartitionResults().keySet(), is(partitions)); + for (final Integer partition : partitions) { + assertion.accept(result.getPartitionResults().get(partition)); + } + } + } + + private static Properties streamsConfiguration(final boolean cache, final boolean log, + final String supplier) { + final String safeTestName = + IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier; + final Properties config = new Properties(); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + return config; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index da457cb..6cfc5d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; import org.apache.kafka.common.utils.Time; @@ -50,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -104,6 +107,40 @@ public class IntegrationTestUtils { public static final long DEFAULT_TIMEOUT = 60 * 1000L; private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class); + /** + * Repeatedly runs the query until the response is valid and then return the response. + * <p> + * Validity in this case means that the response contains all the desired partitions or that + * it's a global response. + * <p> + * Once position bounding is generally supported, we should migrate tests to wait on the + * expected response position. + */ + public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal( + final KafkaStreams kafkaStreams, + final StateQueryRequest<R> request, + final Set<Integer> partitions) { + + final long start = System.currentTimeMillis(); + final long deadline = start + DEFAULT_TIMEOUT; + + do { + final StateQueryResult<R> result = kafkaStreams.query(request); + if (result.getPartitionResults().keySet().containsAll(partitions) + || result.getGlobalResult() != null) { + return result; + } else { + try { + Thread.sleep(100L); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } while (System.currentTimeMillis() < deadline); + + throw new TimeoutException("The query never returned the desired partitions"); + } + /* * Records state transition for StreamThread */ diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java new file mode 100644 index 0000000..fd881bc --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.query; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class PositionBoundTest { + + @Test + public void shouldCopyPosition() { + final Position position = Position.emptyPosition(); + final PositionBound positionBound = PositionBound.at(position); + position.withComponent("topic", 1, 2L); + + assertThat(position.getTopics(), equalTo(mkSet("topic"))); + assertThat(positionBound.position().getTopics(), empty()); + } + + @Test + public void unboundedShouldBeUnbounded() { + final PositionBound bound = PositionBound.unbounded(); + assertTrue(bound.isUnbounded()); + } + + @Test + public void unboundedShouldThrowOnPosition() { + final PositionBound bound = PositionBound.unbounded(); + assertThrows(IllegalArgumentException.class, bound::position); + } + + @Test + public void shouldEqualPosition() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + final PositionBound bound2 = PositionBound.at(Position.emptyPosition()); + assertEquals(bound1, bound2); + } + + @Test + public void shouldEqualUnbounded() { + final PositionBound bound1 = PositionBound.unbounded(); + final PositionBound bound2 = PositionBound.unbounded(); + assertEquals(bound1, bound2); + } + + @Test + public void shouldEqualSelf() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + assertEquals(bound1, bound1); + } + + @Test + public void shouldNotEqualNull() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + assertNotEquals(bound1, null); + } + + @Test + public void shouldNotHash() { + final PositionBound bound = PositionBound.at(Position.emptyPosition()); + assertThrows(UnsupportedOperationException.class, bound::hashCode); + + // going overboard... + final HashSet<PositionBound> set = new HashSet<>(); + assertThrows(UnsupportedOperationException.class, () -> set.add(bound)); + + final HashMap<PositionBound, Integer> map = new HashMap<>(); + assertThrows(UnsupportedOperationException.class, () -> map.put(bound, 5)); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java new file mode 100644 index 0000000..bee7a36 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.query; + +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; + +public class PositionTest { + + @Test + public void shouldCreateFromMap() { + final Map<String, Map<Integer, Long>> map = mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + ); + + final Position position = Position.fromMap(map); + assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + + // Should be a copy of the constructor map + + map.get("topic1").put(99, 99L); + + // so the position is still the original one + assertThat(position.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + } + + @Test + public void shouldCreateFromNullMap() { + final Position position = Position.fromMap(null); + assertThat(position.getTopics(), equalTo(Collections.emptySet())); + } + + @Test + public void shouldMerge() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position position1 = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset + mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition + mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic + )); + + final Position merged = position.merge(position1); + + assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2"))); + assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L)))); + assertThat(merged.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L), + mkEntry(8, 1L) + ))); + assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L)))); + } + + @Test + public void shouldCopy() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position copy = position.copy(); + + // mutate original + position.withComponent("topic", 0, 6L); + position.withComponent("topic1", 8, 1L); + position.withComponent("topic2", 2, 4L); + + // copy has not changed + assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + assertThat(copy.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + + // original has changed + assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2"))); + assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L)))); + assertThat(position.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L), + mkEntry(8, 1L) + ))); + assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L)))); + } + + @Test + public void shouldMergeNull() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position merged = position.merge(null); + + assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + assertThat(merged.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + } + + @Test + public void shouldMatchOnEqual() { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.withComponent("topic1", 0, 1); + position2.withComponent("topic1", 0, 1); + + position1.withComponent("topic1", 1, 2); + position2.withComponent("topic1", 1, 2); + + position1.withComponent("topic1", 2, 1); + position2.withComponent("topic1", 2, 1); + + position1.withComponent("topic2", 0, 0); + position2.withComponent("topic2", 0, 0); + + assertEquals(position1, position2); + } + + @Test + public void shouldNotMatchOnUnEqual() { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.withComponent("topic1", 0, 1); + position2.withComponent("topic1", 0, 1); + + position1.withComponent("topic1", 1, 2); + + position1.withComponent("topic1", 2, 1); + position2.withComponent("topic1", 2, 1); + + position1.withComponent("topic2", 0, 0); + position2.withComponent("topic2", 0, 0); + + assertNotEquals(position1, position2); + } + + @Test + public void shouldNotMatchNull() { + final Position position = Position.emptyPosition(); + assertNotEquals(position, null); + } + + @Test + public void shouldMatchSelf() { + final Position position = Position.emptyPosition(); + assertEquals(position, position); + } + + @Test + public void shouldNotHash() { + final Position position = Position.emptyPosition(); + assertThrows(UnsupportedOperationException.class, position::hashCode); + + // going overboard... + final HashSet<Position> set = new HashSet<>(); + assertThrows(UnsupportedOperationException.class, () -> set.add(position)); + + final HashMap<Position, Integer> map = new HashMap<>(); + assertThrows(UnsupportedOperationException.class, () -> map.put(position, 5)); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 2809050..864500a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.Optional; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.MetricConfig; @@ -72,6 +71,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID;
