This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4fe22d4553889170600438f809f816b9c3d40a86
Author: John Roesler <[email protected]>
AuthorDate: Thu Oct 14 12:53:11 2021 -0500

    IQv2 Framework
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 111 +++-
 .../streams/errors/StreamsStoppedException.java    |  38 ++
 .../apache/kafka/streams/processor/StateStore.java |  33 ++
 .../kafka/streams/processor/StateStoreContext.java |   4 +-
 .../apache/kafka/streams/query/FailureReason.java  |  47 ++
 .../org/apache/kafka/streams/query/Position.java   | 172 +++++++
 .../apache/kafka/streams/query/PositionBound.java  | 109 ++++
 .../java/org/apache/kafka/streams/query/Query.java |  35 ++
 .../apache/kafka/streams/query/QueryResult.java    | 210 ++++++++
 .../kafka/streams/query/StateQueryRequest.java     | 225 +++++++++
 .../kafka/streams/query/StateQueryResult.java      | 114 +++++
 .../apache/kafka/streams/state/StateSerdes.java    |   9 +
 .../org/apache/kafka/streams/state/Stores.java     |  19 +-
 .../InMemoryKeyValueBytesStoreSupplier.java        |  45 ++
 .../state/internals/InMemoryKeyValueStore.java     |  17 +
 .../state/internals/InMemorySessionStore.java      |   9 +
 .../state/internals/InMemoryWindowStore.java       |  14 +
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |  21 +
 .../state/internals/MemoryNavigableLRUCache.java   |  17 +
 .../kafka/streams/state/internals/PingQuery.java   |  29 ++
 .../state/internals/RocksDBSessionStore.java       |   9 +
 .../streams/state/internals/RocksDBStore.java      |  14 +
 .../state/internals/RocksDBWindowStore.java        |   9 +
 .../streams/state/internals/StoreQueryUtils.java   |  52 ++
 .../internals/TimestampedKeyValueStoreBuilder.java |  17 +
 .../internals/TimestampedWindowStoreBuilder.java   |  17 +
 .../WindowToTimestampedWindowByteStoreAdapter.java |  20 +
 .../streams/state/internals/WrappedStateStore.java |  19 +
 .../streams/integration/IQv2IntegrationTest.java   | 433 ++++++++++++++++
 .../integration/IQv2StoreIntegrationTest.java      | 559 +++++++++++++++++++++
 .../integration/utils/IntegrationTestUtils.java    |  37 ++
 .../kafka/streams/query/PositionBoundTest.java     |  96 ++++
 .../apache/kafka/streams/query/PositionTest.java   | 214 ++++++++
 .../streams/state/internals/RocksDBStoreTest.java  |   2 +-
 34 files changed, 2753 insertions(+), 23 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb0c40a..ee0e503 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -40,27 +41,34 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
-import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.Task;
 import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
@@ -78,17 +86,18 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -1716,4 +1725,100 @@ public class KafkaStreams implements AutoCloseable {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the 
internal state of
+     * stateful processors. See 
https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. 
Just call {@link
+     *                                    KafkaStreams#start()} and then retry 
this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state 
like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or 
ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not 
exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after 
calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_UP_TO_BOUND,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not 
active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
new file mode 100644
index 0000000..c05708b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.streams.KafkaStreams.State;
+
+/**
+ * Indicates that Kafka Streams is in a terminating or terminal state, such as 
{@link
+ * State#PENDING_SHUTDOWN},{@link State#PENDING_ERROR},{@link 
State#NOT_RUNNING}, or {@link
+ * State#ERROR}. This Streams instance will need to be discarded and replaced 
before it can
+ * serve queries. The caller may wish to query a different instance.
+ */
+public class StreamsStoppedException extends InvalidStateStoreException {
+
+    private static final long serialVersionUID = 1L;
+
+    public StreamsStoppedException(final String message) {
+        super(message);
+    }
+
+    public StreamsStoppedException(final String message, final Throwable 
throwable) {
+        super(message, throwable);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 76d1ab4..2f96020 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,9 +16,14 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -119,4 +124,32 @@ public interface StateStore {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position 
tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the 
argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in 
the failure message
+     * that bounded positions are not supported.
+     * <p>
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed 
execution info for the query
+     * @param <R> The result type
+     */
+    @Evolving
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+        // If a store doesn't implement a query handler, then all queries are 
unknown.
+        return QueryResult.forUnknownQueryType(query, this);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
index 50d5879..f6f1446 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -46,8 +46,8 @@ public interface StateStoreContext {
 
     /**
      * Return the metadata of the current topic/partition/offset if available.
-     * This is defined as the metadata of the record that is currently been
-     * processed by the StreamTask that holds the store.
+     * This is defined as the metadata of the record that is currently being
+     * processed (or was last processed) by the StreamTask that holds the 
store.
      * <p>
      * Note that the metadata is not defined during all store interactions, for
      * example, while the StreamTask is running a punctuation.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java 
b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
new file mode 100644
index 0000000..97e342d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public enum FailureReason {
+    /**
+     * Failure indicating that the store doesn't know how to handle the given 
query.
+     */
+    UNKNOWN_QUERY_TYPE,
+
+    /**
+     * Failure indicating that the store partition is not (yet) up to the 
desired bound.
+     * The caller should either try again later or try a different replica.
+     */
+    NOT_UP_TO_BOUND,
+
+    /**
+     * Failure indicating that the requested store partition is not present on 
the local
+     * KafkaStreams instance. It may have been migrated to another instance 
during a rebalance.
+     * The caller is recommended to try a different replica.
+     */
+    NOT_PRESENT,
+
+    /**
+     * The requested store partition does not exist at all. For example, 
partition 4 was requested,
+     * but the store in question only has 4 partitions (0 through 3).
+     */
+    DOES_NOT_EXIST;
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java 
b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
new file mode 100644
index 0000000..48e524f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic 
partitions. For example, in
+ * Interactive Query ({@link 
org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each 
of which contains
+ * information from multiple input topics' partitions. This class can be used 
to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended 
to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get 
the reference. For
+ * examples, when a store executes a {@link 
org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position 
instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
position;
+
+    private Position(final ConcurrentHashMap<String, 
ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> 
partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the 
provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, 
Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and 
partition.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates 
the Position. Note
+     * also: this method does not enforce anything about the arguments, except 
that the topic must
+     * not be null. It is the caller's responsibility to enforce desired 
semantics and validity.
+     */
+    public Position withComponent(final String topic, final int partition, 
final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .put(partition, offset);
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));
+    }
+
+    /**
+     * Create a new, structurally independent Position that is the result of 
merging two other
+     * Positions.
+     * <p>
+     * If both Positions contain the same topic -> partition -> offset 
mapping, the resulting
+     * Position will contain a mapping with the larger of the two offsets.
+     */
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;
+        } else {
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
copy =
+                deepCopy(position);
+            for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : 
other.position.entrySet()) {
+                final String topic = entry.getKey();
+                final Map<Integer, Long> partitionMap =
+                    copy.computeIfAbsent(topic, k -> new 
ConcurrentHashMap<>());
+                for (final Entry<Integer, Long> partitionOffset : 
entry.getValue().entrySet()) {
+                    final Integer partition = partitionOffset.getKey();
+                    final Long offset = partitionOffset.getValue();
+                    if (!partitionMap.containsKey(partition)
+                        || partitionMap.get(partition) < offset) {
+                        partitionMap.put(partition, offset);
+                    }
+                }
+            }
+            return new Position(copy);
+        }
+    }
+
+    /**
+     * Return the topics that are represented in this Position.
+     */
+    public Set<String> getTopics() {
+        return Collections.unmodifiableSet(position.keySet());
+    }
+
+    /**
+     * Return the partition -> offset mapping for a specific topic.
+     */
+    public Map<Integer, Long> getBound(final String topic) {
+        return Collections.unmodifiableMap(position.get(topic));
+    }
+
+    private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
deepCopy(
+        final Map<String, ? extends Map<Integer, Long>> map) {
+        if (map == null) {
+            return new ConcurrentHashMap<>();
+        } else {
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
copy =
+                new ConcurrentHashMap<>(map.size());
+            for (final Entry<String, ? extends Map<Integer, Long>> entry : 
map.entrySet()) {
+                copy.put(entry.getKey(), new 
ConcurrentHashMap<>(entry.getValue()));
+            }
+            return copy;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Position{" +
+            "position=" + position +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final Position position1 = (Position) o;
+        return Objects.equals(position, position1.position);
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException(
+            "This mutable object is not suitable as a hash key");
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java 
b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
new file mode 100644
index 0000000..46d9376
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This 
can be used to
+ * specify that a query should fail if the locally available partition isn't 
caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location 
of the partition.
+ */
+@Evolving
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
+        }
+    }
+
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    /**
+     * Returns the specific position of this bound.
+     *
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException(
+                "Cannot get the position of an unbounded PositionBound."
+            );
+        } else {
+            return position;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (isUnbounded()) {
+            return "PositionBound{unbounded}";
+        } else {
+            return "PositionBound{position=" + position + '}';
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final PositionBound that = (PositionBound) o;
+        return unbounded == that.unbounded && Objects.equals(position, 
that.position);
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException(
+            "This mutable object is not suitable as a hash key");
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Query.java 
b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
new file mode 100644
index 0000000..ad77a91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+/**
+ * Marker interface that all interactive queries must implement (see {@link
+ * org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}).
+ * <p>
+ * You can find all available queries by searching for classes implementing 
this interface.
+ * <p>
+ * Kafka Streams will pass unknown query types straight through into the bytes 
stores, so callers
+ * can add custom queries by implementing this interface and providing custom 
stores that handle
+ * them (via {@link org.apache.kafka.streams.state.StoreSupplier}s.
+ * <p>
+ * See KIP-796 (https://cwiki.apache.org/confluence/x/34xnCw) for more details.
+ *
+ * @param <R> The type of the result returned by this query.
+ */
+public interface Query<R> {
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
new file mode 100644
index 0000000..780ea86
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link 
StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String 
failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. 
Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. 
Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to 
indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, 
PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute 
"
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new 
query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to 
indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, 
PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) 
during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it 
was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available 
in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure 
is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link 
StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query 
was executed. Can be
+     * used in conjunction with subsequent queries via {@link 
StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure reason because this query did not fail."
+            );
+        }
+        return failureReason;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the failure 
message.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public String getFailureMessage() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure message because this query did not fail."
+            );
+        }
+        return failure;
+    }
+
+    /**
+     * Returns the result of executing the query on one partition. The result 
type is determined by
+     * the query. Note: queries may choose to return {@code null} for a 
successful query, so {@link
+     * this#isSuccess()} and {@link this#isFailure()} must be used to 
determine whether the query
+     * was successful of failed on this partition.
+     *
+     * @throws IllegalArgumentException if this is not a successful query.
+     */
+    public R getResult() {
+        if (!isSuccess()) {
+            throw new IllegalArgumentException(
+                "Cannot get result for failed query. Failure is " + 
failureReason.name() + ": "
+                    + failure);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", position=" + position +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
new file mode 100644
index 0000000..eb99b79
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The request object for Interactive Queries. This is an immutable builder 
class for passing all
+ * required and optional arguments for querying a state store in Kafka Streams.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryRequest<R> {
+
+    private final String storeName;
+    private final PositionBound position;
+    private final Optional<Set<Integer>> partitions;
+    private final Query<R> query;
+    private final boolean executionInfoEnabled;
+    private final boolean requireActive;
+
+    private StateQueryRequest(
+        final String storeName,
+        final PositionBound position,
+        final Optional<Set<Integer>> partitions,
+        final Query<R> query,
+        final boolean executionInfoEnabled,
+        final boolean requireActive) {
+
+        this.storeName = storeName;
+        this.position = position;
+        this.partitions = partitions;
+        this.query = query;
+        this.executionInfoEnabled = executionInfoEnabled;
+        this.requireActive = requireActive;
+    }
+
+    /**
+     * Specifies the name of the store to query.
+     */
+    public static InStore inStore(final String name) {
+        return new InStore(name);
+    }
+
+    /**
+     * Bounds the position of the state store against its input topics.
+     */
+    public StateQueryRequest<R> withPositionBound(final PositionBound 
positionBound) {
+        return new StateQueryRequest<>(
+            storeName,
+            positionBound,
+            partitions,
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+
+    /**
+     * Specifies that the query will run against all locally available 
partitions.
+     */
+    public StateQueryRequest<R> withAllPartitions() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.empty(),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies a set of partitions to run against. If some partitions are 
not locally available,
+     * the response will contain a {@link FailureReason#NOT_PRESENT} for those 
partitions. If some
+     * partitions in this set are not valid partitions for the store, the 
response will contain a
+     * {@link FailureReason#DOES_NOT_EXIST} for those partitions.
+     */
+    public StateQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.of(Collections.unmodifiableSet(new 
HashSet<>(partitions))),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Requests for stores and the Streams runtime to record any useful 
details about how the query
+     * was executed.
+     */
+    public StateQueryRequest<R> enableExecutionInfo() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            true,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies that this query should only run on partitions for which this 
instance is the leader
+     * (aka "active"). Partitions for which this instance is not the active 
replica will return
+     * {@link FailureReason#NOT_UP_TO_BOUND}.
+     */
+    public StateQueryRequest<R> requireActive() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            executionInfoEnabled,
+            true
+        );
+    }
+
+    /**
+     * The name of the store this request is for.
+     */
+    public String getStoreName() {
+        return storeName;
+    }
+
+    /**
+     * The bound that this request places on its query, in terms of the 
partitions' positions
+     * against its inputs.
+     */
+    public PositionBound getPositionBound() {
+        return position;
+    }
+
+    /**
+     * The query this request is meant to run.
+     */
+    public Query<R> getQuery() {
+        return query;
+    }
+
+    /**
+     * Whether this request should fetch from all locally available partitions.
+     */
+    public boolean isAllPartitions() {
+        return !partitions.isPresent();
+    }
+
+    /**
+     * If the request is for specific partitions, return the set of partitions 
to query.
+     *
+     * @throws IllegalStateException if this is a request for all partitions
+     */
+    public Set<Integer> getPartitions() {
+        if (!partitions.isPresent()) {
+            throw new IllegalStateException(
+                "Cannot list partitions of an 'all partitions' request");
+        } else {
+            return partitions.get();
+        }
+    }
+
+    /**
+     * Whether the request includes detailed execution information.
+     */
+    public boolean executionInfoEnabled() {
+        return executionInfoEnabled;
+    }
+
+    /**
+     * Whether this request requires the query to execute only on active 
partitions.
+     */
+    public boolean isRequireActive() {
+        return requireActive;
+    }
+
+    /**
+     * A progressive builder interface for creating {@code StoreQueryRequest}s.
+     */
+    public static class InStore {
+
+        private final String name;
+
+        private InStore(final String name) {
+            this.name = name;
+        }
+
+        /**
+         * Specifies the query to run on the specified store.
+         */
+        public <R> StateQueryRequest<R> withQuery(final Query<R> query) {
+            return new StateQueryRequest<>(
+                name, // name is already specified
+                PositionBound.unbounded(), // default: unbounded
+                Optional.empty(), // default: all partitions
+                query, // the query is specified
+                false, // default: no execution info
+                false // default: don't require active
+            );
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
new file mode 100644
index 0000000..8b93bd6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual 
partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new 
HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and 
available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query. Used by Kafka Streams and 
available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+
+    /**
+     * The query's result for each partition that executed the query. Empty 
for global store
+     * queries.
+     */
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    /**
+     * For queries that are expected to match records in only one partition, 
returns the result.
+     *
+     * @throws IllegalArgumentException if the results are not for exactly one 
partition.
+     */
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalArgumentException(
+                "The query did not return exactly one partition result: " + 
partitionResults
+            );
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    /**
+     * The query's result for global store queries. Is {@code null} for 
non-global (partitioned)
+     * store queries.
+     */
+    public QueryResult<R> getGlobalResult() {
+        return globalResult;
+    }
+
+    /**
+     * The position of the state store at the moment it executed the query. In 
conjunction with
+     * {@link StateQueryRequest#withPositionBound}, this can be used to 
achieve a good balance
+     * between consistency and availability in which repeated queries are 
guaranteed to advance in
+     * time while allowing reads to be served from any replica that is caught 
up to that caller's
+     * prior observations.
+     */
+    public Position getPosition() {
+        Position position = Position.emptyPosition();
+        for (final QueryResult<R> r : partitionResults.values()) {
+            position = position.merge(r.getPosition());
+        }
+        return position;
+    }
+
+    @Override
+    public String toString() {
+        return "StateQueryResult{" +
+            "partitionResults=" + partitionResults +
+            ", globalResult=" + globalResult +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f9f0bdc..da7927e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -212,4 +212,13 @@ public final class StateSerdes<K, V> {
                     e);
         }
     }
+
+    @Override
+    public String toString() {
+        return "StateSerdes{" +
+            "topic='" + topic + '\'' +
+            ", keySerde=" + keySerde +
+            ", valueSerde=" + valueSerde +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index bf4e5aa..17fbbb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.InMemorySessionBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@@ -122,22 +122,7 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final 
String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new KeyValueBytesStoreSupplier() {
-            @Override
-            public String name() {
-                return name;
-            }
-
-            @Override
-            public KeyValueStore<Bytes, byte[]> get() {
-                return new InMemoryKeyValueStore(name);
-            }
-
-            @Override
-            public String metricsScope() {
-                return "in-memory";
-            }
-        };
+        return new InMemoryKeyValueBytesStoreSupplier(name);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java
new file mode 100644
index 0000000..ef0f96a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class InMemoryKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupplier {
+
+    private final String name;
+
+    public InMemoryKeyValueBytesStoreSupplier(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public KeyValueStore<Bytes, byte[]> get() {
+        return new InMemoryKeyValueStore(name);
+    }
+
+    @Override
+    public String metricsScope() {
+        return "in-memory";
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index cd6d29f..7b190c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.slf4j.Logger;
@@ -90,6 +93,20 @@ public class InMemoryKeyValueStore implements 
KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );
+    }
+
+    @Override
     public synchronized byte[] get(final Bytes key) {
         return map.get(key);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index bc8cda6..03b4594 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -27,6 +27,9 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.slf4j.Logger;
@@ -292,6 +295,12 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
+        final boolean collectExecutionInfo) {
+        return StoreQueryUtils.handleBasicQueries(query, positionBound, 
collectExecutionInfo, this);
+    }
+
+    @Override
     public void flush() {
         // do-nothing since it is in-memory
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 5327e75..b07ded5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -27,6 +27,9 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -328,6 +331,17 @@ public class InMemoryWindowStore implements 
WindowStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
+        final boolean collectExecutionInfo) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );
+    }
+
+    @Override
     public void flush() {
         // do-nothing since it is in-memory
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index d9b42c2..e9cdece 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -116,6 +119,24 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter 
implements KeyValueSt
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        final QueryResult<R> result = store.query(query, positionBound, 
collectExecutionInfo);
+        if (collectExecutionInfo) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (end - start) + "ns"
+            );
+        }
+        return result;
+    }
+
+    @Override
     public byte[] get(final Bytes key) {
         return convertToTimestampedFormat(store.get(key));
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 84f46ad..eda880e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,6 +111,20 @@ public class MemoryNavigableLRUCache extends 
MemoryLRUCache {
         return new TreeMap<>(this.map);
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );
+    }
+
 
     private static class CacheIterator implements KeyValueIterator<Bytes, 
byte[]> {
         private final Iterator<Bytes> keys;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
new file mode 100644
index 0000000..1eaf128
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.Query;
+
+/**
+ * A very simple query that all stores can handle to verify that the store is 
participating in the
+ * IQv2 framework properly.
+ * <p>
+ * This is not a public API and may change without notice.
+ */
+public class PingQuery implements Query<Boolean> {
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index f5d7108..a7f7eb3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 
@@ -31,6 +34,12 @@ public class RocksDBSessionStore
     }
 
     @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
+        final boolean collectExecutionInfo) {
+        return StoreQueryUtils.handleBasicQueries(query, positionBound, 
collectExecutionInfo, this);
+    }
+
+    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
                                                                   final long 
earliestSessionEndTime,
                                                                   final long 
latestSessionStartTime) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 93dd93b..117b9bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -29,6 +29,9 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -109,6 +112,17 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     private StateStoreContext context;
     private Position position;
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
+        final boolean collectExecutionInfo) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );
+    }
+
     RocksDBStore(final String name,
                  final String metricsScope) {
         this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, 
name));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 8f48dca..3e7a43f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -112,4 +115,10 @@ public class RocksDBWindowStore
             seqnum = (seqnum + 1) & 0x7FFFFFFF;
         }
     }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
+        final boolean collectExecutionInfo) {
+        return StoreQueryUtils.handleBasicQueries(query, positionBound, 
collectExecutionInfo, this);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
new file mode 100644
index 0000000..b1d448a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo,
+        final StateStore store) {
+
+        final QueryResult<R> result;
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        // TODO: position tracking
+        if (query instanceof PingQuery) {
+            result = (QueryResult<R>) QueryResult.forResult(true);
+        } else {
+            result = QueryResult.forUnknownQueryType(query, store);
+        }
+        if (collectExecutionInfo) {
+            result.addExecutionInfo(
+                "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
+            );
+        }
+        return result;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index a249a14..f6b3447 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -185,6 +188,20 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+            final QueryResult<R> result = wrapped.query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                final long end = System.nanoTime();
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(end - start) + "ns");
+            }
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index b3727f5..840274a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -203,6 +206,20 @@ public class TimestampedWindowStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+            final QueryResult<R> result = wrapped.query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                final long end = System.nanoTime();
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(end - start) + "ns");
+            }
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index f7999d3..88549ae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -183,6 +186,23 @@ class WindowToTimestampedWindowByteStoreAdapter implements 
WindowStore<Bytes, by
         return store.isOpen();
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        final QueryResult<R> result = store.query(query, positionBound, 
collectExecutionInfo);
+        if (collectExecutionInfo) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (end - start) + "ns"
+            );
+        }
+        return result;
+    }
+
 
     private static class WindowToTimestampedWindowIteratorAdapter
         extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e8244f7..7531b38 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -20,6 +20,9 @@ import 
org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 /**
@@ -103,6 +106,22 @@ public abstract class WrappedStateStore<S extends 
StateStore, K, V> implements S
         wrapped.close();
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        final QueryResult<R> result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+        if (collectExecutionInfo) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " via WrappedStateStore" + " in " 
+ (end - start)
+                    + "ns");
+        }
+        return result;
+    }
+
     public S wrapped() {
         return wrapped;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
new file mode 100644
index 0000000..7d33c74
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, 
TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, 
TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(STORE_NAME)
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration());
+        kafkaStreams.cleanUp();
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldFailUnknownStore() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore("unknown-store").withQuery(query);
+
+        assertThrows(UnknownStateStoreException.class, () -> 
kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailNotStarted() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        assertThrows(StreamsNotStartedException.class, () -> 
kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailStopped() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        kafkaStreams.start();
+        kafkaStreams.close(Duration.ZERO);
+        assertThrows(StreamsStoppedException.class, () -> 
kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldRejectNonRunningActive()
+        throws NoSuchFieldException, IllegalAccessException {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).requireActive();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        kafkaStreams.start();
+
+        final Field threadsField = 
KafkaStreams.class.getDeclaredField("threads");
+        threadsField.setAccessible(true);
+        @SuppressWarnings("unchecked") final List<StreamThread> threads =
+            (List<StreamThread>) threadsField.get(kafkaStreams);
+        final StreamThread streamThread = threads.get(0);
+
+        final Field stateLock = 
StreamThread.class.getDeclaredField("stateLock");
+        stateLock.setAccessible(true);
+        final Object lock = stateLock.get(streamThread);
+
+        // wait for the desired partitions to be assigned
+        IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+            kafkaStreams,
+            inStore(STORE_NAME).withQuery(query),
+            partitions
+        );
+
+        // then lock the thread state, change it, and make our assertions.
+        synchronized (lock) {
+            final Field stateField = 
StreamThread.class.getDeclaredField("state");
+            stateField.setAccessible(true);
+            stateField.set(streamThread, State.PARTITIONS_ASSIGNED);
+
+            final StateQueryResult<Boolean> result =
+                IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+                    kafkaStreams,
+                    request,
+                    partitions
+                );
+
+            assertThat(result.getPartitionResults().keySet(), is(partitions));
+            for (final Integer partition : partitions) {
+                
assertThat(result.getPartitionResults().get(partition).isFailure(), is(true));
+                assertThat(
+                    
result.getPartitionResults().get(partition).getFailureReason(),
+                    is(FailureReason.NOT_UP_TO_BOUND)
+                );
+                assertThat(
+                    
result.getPartitionResults().get(partition).getFailureMessage(),
+                    is("Query requires a running active task,"
+                        + " but partition was in state PARTITIONS_ASSIGNED and 
was active.")
+                );
+            }
+        }
+    }
+
+    @Test
+    public void shouldFetchFromPartition() {
+        final PingQuery query = new PingQuery();
+        final int partition = 1;
+        final Set<Integer> partitions = singleton(partition);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
+
+        kafkaStreams.start();
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
+    }
+
+    @Test
+    public void shouldFetchExplicitlyFromAllPartitions() {
+        final PingQuery query = new PingQuery();
+        final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withAllPartitions();
+
+        kafkaStreams.start();
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
+    }
+
+    @Test
+    public void shouldNotRequireQueryHandler() {
+        final PingQuery query = new PingQuery();
+        final int partition = 1;
+        final Set<Integer> partitions = singleton(partition);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(new KeyValueBytesStoreSupplier() {
+                @Override
+                public String name() {
+                    return STORE_NAME;
+                }
+
+                @Override
+                public KeyValueStore<Bytes, byte[]> get() {
+                    return new KeyValueStore<Bytes, byte[]>() {
+                        private boolean open = false;
+                        private Map<Bytes, byte[]> map = new HashMap<>();
+
+                        @Override
+                        public void put(final Bytes key, final byte[] value) {
+                            map.put(key, value);
+                        }
+
+                        @Override
+                        public byte[] putIfAbsent(final Bytes key, final 
byte[] value) {
+                            return map.putIfAbsent(key, value);
+                        }
+
+                        @Override
+                        public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
+                            for (final KeyValue<Bytes, byte[]> entry : 
entries) {
+                                map.put(entry.key, entry.value);
+                            }
+                        }
+
+                        @Override
+                        public byte[] delete(final Bytes key) {
+                            return map.remove(key);
+                        }
+
+                        @Override
+                        public String name() {
+                            return STORE_NAME;
+                        }
+
+                        @Override
+                        public void init(final ProcessorContext context, final 
StateStore root) {
+                            context.register(root, (key, value) -> 
put(Bytes.wrap(key), value));
+                            this.open = true;
+                        }
+
+                        @Override
+                        public void flush() {
+
+                        }
+
+                        @Override
+                        public void close() {
+                            this.open = false;
+                            map.clear();
+                        }
+
+                        @Override
+                        public boolean persistent() {
+                            return false;
+                        }
+
+                        @Override
+                        public boolean isOpen() {
+                            return open;
+                        }
+
+                        @Override
+                        public byte[] get(final Bytes key) {
+                            return map.get(key);
+                        }
+
+                        @Override
+                        public KeyValueIterator<Bytes, byte[]> range(
+                            final Bytes from,
+                            final Bytes to
+                        ) {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public KeyValueIterator<Bytes, byte[]> all() {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public long approximateNumEntries() {
+                            return map.size();
+                        }
+                    };
+                }
+
+                @Override
+                public String metricsScope() {
+                    return "nonquery";
+                }
+            })
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration());
+        kafkaStreams.cleanUp();
+
+        kafkaStreams.start();
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        final QueryResult<Boolean> queryResult = 
result.getPartitionResults().get(partition);
+        assertThat(queryResult.isFailure(), is(true));
+        assertThat(queryResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
+        assertThat(queryResult.getFailureMessage(), matchesPattern(
+            "This store (.*) doesn't know how to execute the given query (.*)."
+                + " Contact the store maintainer if you need support for a new 
query type."
+        ));
+    }
+
+
+    private static Properties streamsConfiguration() {
+        final String safeTestName = IQv2IntegrationTest.class.getName();
+        final Properties config = new Properties();
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        return config;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
new file mode 100644
index 0000000..0c0ba39
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, 
Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, 
Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, 
Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, 
Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, 
Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    values.add(new Object[]{cacheEnabled, logEnabled, 
toTest.name()});
+                }
+            }
+        }
+        return values;
+    }
+
+    public IQv2StoreIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest) {
+
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+    }
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, 
TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, 
TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreSupplier<?> supplier = storeToTest.supplier();
+        if (supplier instanceof KeyValueBytesStoreSupplier) {
+            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> 
materialized =
+                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (storeToTest.global()) {
+                builder
+                    .globalTable(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized
+                    );
+            } else {
+                builder
+                    .table(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized
+                    );
+            }
+        } else if (supplier instanceof WindowBytesStoreSupplier) {
+            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> 
materialized =
+                Materialized.as((WindowBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    materialized
+                );
+        } else if (supplier instanceof SessionBytesStoreSupplier) {
+            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> 
materialized =
+                Materialized.as((SessionBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                    materialized
+                );
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized 
type.");
+        }
+
+        // Don't need to wait for running, since tests can use iqv2 to wait 
until they
+        // get a valid response.
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfiguration(
+                    cache,
+                    log,
+                    supplier.getClass().getSimpleName()
+                ),
+                builder,
+                true
+            );
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void verifyStore() {
+        shouldRejectUnknownQuery();
+        shouldHandlePingQuery();
+        shouldCollectExecutionInfo();
+        shouldCollectExecutionInfoUnderFailure();
+    }
+
+    public void shouldRejectUnknownQuery() {
+
+        final UnknownQuery query = new UnknownQuery();
+        final StateQueryRequest<Void> request = 
inStore(STORE_NAME).withQuery(query);
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Void> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> {
+                assertThat(queryResult.isFailure(), is(true));
+                assertThat(queryResult.isSuccess(), is(false));
+                assertThat(queryResult.getPosition(), is(nullValue()));
+                assertThat(queryResult.getFailureReason(),
+                    is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(queryResult.getFailureMessage(),
+                    matchesPattern(
+                        "This store (.*)"
+                            + " doesn't know how to execute the given query"
+                            + " (.*)."
+                            + " Contact the store maintainer if you need 
support for a new query type."
+                    )
+                );
+                assertThrows(IllegalArgumentException.class, 
queryResult::getResult);
+
+                assertThat(queryResult.getExecutionInfo(), is(empty()));
+            }
+        );
+    }
+
+    public void shouldHandlePingQuery() {
+
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> {
+                final boolean failure = queryResult.isFailure();
+                if (failure) {
+                    assertThat(queryResult.toString(), failure, is(false));
+                }
+                assertThat(queryResult.isSuccess(), is(true));
+
+                // TODO: position not implemented
+                assertThat(queryResult.getPosition(), is(nullValue()));
+
+                assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+                assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+                assertThat(queryResult.getResult(), is(true));
+
+                assertThat(queryResult.getExecutionInfo(), is(empty()));
+            });
+    }
+
+    public void shouldCollectExecutionInfo() {
+
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> assertThat(queryResult.getExecutionInfo(), 
not(empty()))
+        );
+    }
+
+    public void shouldCollectExecutionInfoUnderFailure() {
+
+        final UnknownQuery query = new UnknownQuery();
+        final StateQueryRequest<Void> request =
+            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Void> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, 
request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> assertThat(queryResult.getExecutionInfo(), 
not(empty()))
+        );
+    }
+
+    private <R> void makeAssertions(
+        final Set<Integer> partitions,
+        final StateQueryResult<R> result,
+        final Consumer<QueryResult<R>> assertion) {
+
+        if (result.getGlobalResult() != null) {
+            assertion.accept(result.getGlobalResult());
+        } else {
+            assertThat(result.getPartitionResults().keySet(), is(partitions));
+            for (final Integer partition : partitions) {
+                assertion.accept(result.getPartitionResults().get(partition));
+            }
+        }
+    }
+
+    private static Properties streamsConfiguration(final boolean cache, final 
boolean log,
+        final String supplier) {
+        final String safeTestName =
+            IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log 
+ "-" + supplier;
+        final Properties config = new Properties();
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        return config;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index da457cb..6cfc5d4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Headers;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
 import org.apache.kafka.common.utils.Time;
@@ -50,6 +51,8 @@ import 
org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -104,6 +107,40 @@ public class IntegrationTestUtils {
     public static final long DEFAULT_TIMEOUT = 60 * 1000L;
     private static final Logger LOG = 
LoggerFactory.getLogger(IntegrationTestUtils.class);
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return 
the response.
+     * <p>
+     * Validity in this case means that the response contains all the desired 
partitions or that
+     * it's a global response.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests 
to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request,
+        final Set<Integer> partitions) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        do {
+            final StateQueryResult<R> result = kafkaStreams.query(request);
+            if (result.getPartitionResults().keySet().containsAll(partitions)
+                || result.getGlobalResult() != null) {
+                return result;
+            } else {
+                try {
+                    Thread.sleep(100L);
+                } catch (final InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        } while (System.currentTimeMillis() < deadline);
+
+        throw new TimeoutException("The query never returned the desired 
partitions");
+    }
+
     /*
      * Records state transition for StreamThread
      */
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
new file mode 100644
index 0000000..fd881bc
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {
+
+    @Test
+    public void shouldCopyPosition() {
+        final Position position = Position.emptyPosition();
+        final PositionBound positionBound = PositionBound.at(position);
+        position.withComponent("topic", 1, 2L);
+
+        assertThat(position.getTopics(), equalTo(mkSet("topic")));
+        assertThat(positionBound.position().getTopics(), empty());
+    }
+
+    @Test
+    public void unboundedShouldBeUnbounded() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertTrue(bound.isUnbounded());
+    }
+
+    @Test
+    public void unboundedShouldThrowOnPosition() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertThrows(IllegalArgumentException.class, bound::position);
+    }
+
+    @Test
+    public void shouldEqualPosition() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        final PositionBound bound2 = 
PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualUnbounded() {
+        final PositionBound bound1 = PositionBound.unbounded();
+        final PositionBound bound2 = PositionBound.unbounded();
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualSelf() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound1);
+    }
+
+    @Test
+    public void shouldNotEqualNull() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        assertNotEquals(bound1, null);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final PositionBound bound = PositionBound.at(Position.emptyPosition());
+        assertThrows(UnsupportedOperationException.class, bound::hashCode);
+
+        // going overboard...
+        final HashSet<PositionBound> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
set.add(bound));
+
+        final HashMap<PositionBound, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> map.put(bound, 
5));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
new file mode 100644
index 0000000..bee7a36
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", 
"topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", 
"topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 
4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldMatchOnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+        position2.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchOnUnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertNotEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchNull() {
+        final Position position = Position.emptyPosition();
+        assertNotEquals(position, null);
+    }
+
+    @Test
+    public void shouldMatchSelf() {
+        final Position position = Position.emptyPosition();
+        assertEquals(position, position);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final Position position = Position.emptyPosition();
+        assertThrows(UnsupportedOperationException.class, position::hashCode);
+
+        // going overboard...
+        final HashSet<Position> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
set.add(position));
+
+        final HashMap<Position, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
map.put(position, 5));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 2809050..864500a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.util.Optional;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -72,6 +71,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;

Reply via email to