mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1367475515


##########
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record  from a versioned state 
store based on its key and timestamp.
+ * <p>
+ * See KIP-960 for more details.

Review Comment:
   I would remove this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -139,6 +164,38 @@ public ValueAndTimestamp<V> delete(final K key, final long 
timestamp) {
             }
         }
 
+        @SuppressWarnings("unchecked")
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config) {

Review Comment:
   nit: fix indention (all parameters should align)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                                 final PositionBound 
positionBound,
-                                                 final QueryConfig config) {
-            // throw exception for now to reserve the ability to implement 
this in the future
-            // without clashing with users' custom implementations in the 
meantime
-            throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
+            final PositionBound positionBound,
+            final QueryConfig config) {
+            if (query instanceof VersionedKeyQuery) {
+                final QueryResult<R> result;
+                final VersionedKeyQuery<K, V> typedKeyQuery = 
(VersionedKeyQuery<K, V>) query;
+                VersionedKeyQuery<Bytes, byte[]> rawKeyQuery;
+                if (typedKeyQuery.asOfTimestamp().isPresent()) {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+                    rawKeyQuery = 
rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get());
+                } else {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+                }
+                final QueryResult<VersionedRecord<byte[]>> rawResult =
+                    wrapped().query(rawKeyQuery, positionBound, config);
+                if (rawResult.isSuccess()) {
+                    final Function<byte[], ValueAndTimestamp<V>> deserializer 
= getDeserializeValue(plainValueSerdes);

Review Comment:
   Why is the type `ValueAndTimestamp`? Should it not be `VersionedRecord`?



##########
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record  from a versioned state 
store based on its key and timestamp.

Review Comment:
   nit, remove double space



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -89,6 +102,18 @@ private class MeteredVersionedKeyValueStoreInternal
         private final Serde<V> plainValueSerde;
         private StateSerdes<K, V> plainValueSerdes;
 
+        private final Map<Class, QueryHandler> queryHandlers =
+            mkMap(
+                mkEntry(
+                    RangeQuery.class,

Review Comment:
   Do we need to add this one? `handleRangeQuery` is not implemented anyway?



##########
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record  from a versioned state 
store based on its key and timestamp.
+ * <p>
+ * See KIP-960 for more details.
+ */
+@Evolving
+public final class VersionedKeyQuery<K, V> implements 
Query<VersionedRecord<V>> {
+
+    private final K key;
+    private final Optional<Instant> asOfTimestamp;
+
+    private VersionedKeyQuery(final K key, final Optional<Instant> 
asOfTimestamp) {
+        this.key = Objects.requireNonNull(key);
+        this.asOfTimestamp = asOfTimestamp;
+    }
+
+    /**
+     * Creates a query that will retrieve the record from a versioned state 
store identified by {@code key} if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) {
+        return new VersionedKeyQuery<>(key, Optional.empty());
+    }
+
+    /**
+     * Specifies the as of timestamp for the key query. The key query returns 
the record

Review Comment:
   Should we just say "Specifies the timestamp for the query."



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                                 final PositionBound 
positionBound,
-                                                 final QueryConfig config) {
-            // throw exception for now to reserve the ability to implement 
this in the future
-            // without clashing with users' custom implementations in the 
meantime
-            throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
+            final PositionBound positionBound,
+            final QueryConfig config) {
+            if (query instanceof VersionedKeyQuery) {
+                final QueryResult<R> result;
+                final VersionedKeyQuery<K, V> typedKeyQuery = 
(VersionedKeyQuery<K, V>) query;
+                VersionedKeyQuery<Bytes, byte[]> rawKeyQuery;
+                if (typedKeyQuery.asOfTimestamp().isPresent()) {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+                    rawKeyQuery = 
rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get());
+                } else {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));

Review Comment:
   Duplicate code -- let move this one before the `if` condition.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         protected <R> QueryResult<R> runKeyQuery(final Query<R> query,

Review Comment:
   Should we reserve this for `KeyQuery`, and add a new `versionedKeyQuery` 
method?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext 
context) {
                 prepareValueSerde(plainValueSerde, new SerdeGetter(context))
             );
         }
+
+        private byte[] serializeAsBytes(final VersionedRecord<byte[]> 
versionedRecord) {
+            if (versionedRecord == null) {
+                return null;
+            }
+            final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE

Review Comment:
   Why do we create a `Serde` -- if we only need the `Serializer` we can just 
use `new ValueAndTimestampSerializer`



##########
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record  from a versioned state 
store based on its key and timestamp.
+ * <p>
+ * See KIP-960 for more details.
+ */
+@Evolving
+public final class VersionedKeyQuery<K, V> implements 
Query<VersionedRecord<V>> {
+
+    private final K key;
+    private final Optional<Instant> asOfTimestamp;
+
+    private VersionedKeyQuery(final K key, final Optional<Instant> 
asOfTimestamp) {
+        this.key = Objects.requireNonNull(key);
+        this.asOfTimestamp = asOfTimestamp;
+    }
+
+    /**
+     * Creates a query that will retrieve the record from a versioned state 
store identified by {@code key} if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) {
+        return new VersionedKeyQuery<>(key, Optional.empty());
+    }
+
+    /**
+     * Specifies the as of timestamp for the key query. The key query returns 
the record
+     * with the greatest timestamp <= asOfTimestamp

Review Comment:
   Should be more more colloquial, and say, "the record version for the 
specified timestamp." or something like this?
   
   `greatest timestamp <= asOfTimestamp` is correct, but kinda hard to reason 
about -- note: many of our users are not as close to the project, and we should 
make it easy for the to understand. -- Happy to keep the exact condition as 
additional information.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                                 final PositionBound 
positionBound,
-                                                 final QueryConfig config) {
-            // throw exception for now to reserve the ability to implement 
this in the future
-            // without clashing with users' custom implementations in the 
meantime
-            throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
+            final PositionBound positionBound,
+            final QueryConfig config) {
+            if (query instanceof VersionedKeyQuery) {
+                final QueryResult<R> result;
+                final VersionedKeyQuery<K, V> typedKeyQuery = 
(VersionedKeyQuery<K, V>) query;
+                VersionedKeyQuery<Bytes, byte[]> rawKeyQuery;
+                if (typedKeyQuery.asOfTimestamp().isPresent()) {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+                    rawKeyQuery = 
rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get());
+                } else {
+                    rawKeyQuery = 
VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+                }
+                final QueryResult<VersionedRecord<byte[]>> rawResult =
+                    wrapped().query(rawKeyQuery, positionBound, config);
+                if (rawResult.isSuccess()) {
+                    final Function<byte[], ValueAndTimestamp<V>> deserializer 
= getDeserializeValue(plainValueSerdes);
+                    final ValueAndTimestamp<V>  valueAndTimestamp = 
deserializer.apply(serializeAsBytes(rawResult.getResult()));
+                    final VersionedRecord<V> versionedRecord = new 
VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+                    final QueryResult<VersionedRecord<V>> typedQueryResult =
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
versionedRecord);
+                    result = (QueryResult<R>) typedQueryResult;
+                } else {
+                    // the generic type doesn't matter, since failed queries 
have no result set.
+                    result = (QueryResult<R>) rawResult;
+                }
+                return result;
+            }
+            // reserved for other IQv2 query types (KIP-968, KIP-969)
+            return null;

Review Comment:
   Should we throw an exception instead? We should never hit this point anyway, 
but might be better to have a dedicate exception to guard against bugs?



##########
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record  from a versioned state 
store based on its key and timestamp.
+ * <p>
+ * See KIP-960 for more details.
+ */
+@Evolving
+public final class VersionedKeyQuery<K, V> implements 
Query<VersionedRecord<V>> {
+
+    private final K key;
+    private final Optional<Instant> asOfTimestamp;
+
+    private VersionedKeyQuery(final K key, final Optional<Instant> 
asOfTimestamp) {
+        this.key = Objects.requireNonNull(key);
+        this.asOfTimestamp = asOfTimestamp;
+    }
+
+    /**
+     * Creates a query that will retrieve the record from a versioned state 
store identified by {@code key} if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) {
+        return new VersionedKeyQuery<>(key, Optional.empty());
+    }
+
+    /**
+     * Specifies the as of timestamp for the key query. The key query returns 
the record
+     * with the greatest timestamp <= asOfTimestamp
+     * @param asOfTimestamp The as of timestamp for timestamp
+     * @throws NullPointerException if @param asOfTimestamp is null

Review Comment:
   Should we add the same for `withKey` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext 
context) {
                 prepareValueSerde(plainValueSerde, new SerdeGetter(context))
             );
         }
+
+        private byte[] serializeAsBytes(final VersionedRecord<byte[]> 
versionedRecord) {
+            if (versionedRecord == null) {
+                return null;
+            }
+            final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE
+                = new ValueAndTimestampSerde<>(new ByteArraySerde());
+            final Serializer<ValueAndTimestamp<byte[]>> 
VALUE_AND_TIMESTAMP_SERIALIZER

Review Comment:
   we should use `try-with-resource` for the serializer to avoid warning about 
"unclosed" resources



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext 
context) {
                 prepareValueSerde(plainValueSerde, new SerdeGetter(context))
             );
         }
+
+        private byte[] serializeAsBytes(final VersionedRecord<byte[]> 
versionedRecord) {
+            if (versionedRecord == null) {
+                return null;
+            }
+            final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE
+                = new ValueAndTimestampSerde<>(new ByteArraySerde());
+            final Serializer<ValueAndTimestamp<byte[]>> 
VALUE_AND_TIMESTAMP_SERIALIZER
+                = VALUE_AND_TIMESTAMP_SERDE.serializer();
+            return VALUE_AND_TIMESTAMP_SERIALIZER.serialize(
+                null,
+                ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp()));

Review Comment:
   Similar to above: why do we actually use `ValueAndTimestamp` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedStoreQueryUtils.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.temporal.ChronoField;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+public final class VersionedStoreQueryUtils {
+
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final QueryConfig config,
+            final StateStore store
+        );
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                VersionedKeyQuery.class,
+                VersionedStoreQueryUtils::runKeyQuery
+            )
+        );
+
+    // make this class uninstantiable
+
+    private VersionedStoreQueryUtils() {
+    }
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config,
+        final StateStore store,
+        final Position position,
+        final StateStoreContext context
+    ) {
+
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
+        if (handler == null) {
+            result = QueryResult.forUnknownQueryType(query, store);
+        } else if (context == null || !isPermitted(position, positionBound, 
context.taskId().partition())) {
+            result = QueryResult.notUpToBound(
+                position,
+                positionBound,
+                context == null ? null : context.taskId().partition()
+            );
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                config,
+                store
+            );
+        }
+        if (config.isCollectExecutionInfo()) {
+            result.addExecutionInfo(
+                "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
+            );
+        }
+        result.setPosition(position);
+        return result;
+    }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        final Position bound = positionBound.position();
+        for (final String topic : bound.getTopics()) {
+            final Map<Integer, Long> partitionBounds = 
bound.getPartitionPositions(topic);
+            final Map<Integer, Long> seenPartitionPositions = 
position.getPartitionPositions(topic);
+            if (!partitionBounds.containsKey(partition)) {
+                // this topic isn't bounded for our partition, so just skip 
over it.
+            } else {
+                if (!seenPartitionPositions.containsKey(partition)) {
+                    // we haven't seen a partition that we have a bound for
+                    return false;
+                } else if (seenPartitionPositions.get(partition) < 
partitionBounds.get(partition)) {
+                    // our current position is behind the bound
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound 
positionBound,
+                                                  final QueryConfig config,
+                                                  final StateStore store) {
+        if (store instanceof VersionedKeyValueStore) {
+            final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore 
=
+                (VersionedKeyValueStore<Bytes, byte[]>) store;
+            if (query instanceof VersionedKeyQuery) {
+                final VersionedKeyQuery<Bytes, byte[]> rawKeyQuery =
+                    (VersionedKeyQuery<Bytes, byte[]>) query;
+                try {
+                    final VersionedRecord<byte[]> bytes;
+                    if (((VersionedKeyQuery<?, ?>) 
query).asOfTimestamp().isPresent()) {
+                        bytes = versionedKeyValueStore.get(rawKeyQuery.key(),
+                            ((VersionedKeyQuery<?, ?>) 
query).asOfTimestamp().get()
+                                .getLong(ChronoField.INSTANT_SECONDS));
+                    } else {
+                        bytes = versionedKeyValueStore.get(rawKeyQuery.key());
+                    }
+                    return (QueryResult<R>) QueryResult.forResult(bytes);
+                } catch (final Exception e) {
+                    final String message = parseStoreException(e, store, 
query);
+                    return QueryResult.forFailure(
+                        FailureReason.STORE_EXCEPTION,
+                        message
+                    );
+                }
+            } else {
+                // Here is preserved for VersionedMultiTimestampedKeyQuery
+                return QueryResult.forUnknownQueryType(query, store);
+            }
+        } else {
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], ValueAndTimestamp<V>> 
getDeserializeValue(
+        final StateSerdes<?, V> serdes) {

Review Comment:
   nit: Why is this in it's own line?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedStoreQueryUtils.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.temporal.ChronoField;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+public final class VersionedStoreQueryUtils {
+
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final QueryConfig config,
+            final StateStore store
+        );
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                VersionedKeyQuery.class,
+                VersionedStoreQueryUtils::runKeyQuery
+            )
+        );
+
+    // make this class uninstantiable
+
+    private VersionedStoreQueryUtils() {
+    }
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config,
+        final StateStore store,
+        final Position position,
+        final StateStoreContext context
+    ) {
+
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
+        if (handler == null) {
+            result = QueryResult.forUnknownQueryType(query, store);
+        } else if (context == null || !isPermitted(position, positionBound, 
context.taskId().partition())) {
+            result = QueryResult.notUpToBound(
+                position,
+                positionBound,
+                context == null ? null : context.taskId().partition()
+            );
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                config,
+                store
+            );
+        }
+        if (config.isCollectExecutionInfo()) {
+            result.addExecutionInfo(
+                "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
+            );
+        }
+        result.setPosition(position);
+        return result;
+    }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        final Position bound = positionBound.position();
+        for (final String topic : bound.getTopics()) {
+            final Map<Integer, Long> partitionBounds = 
bound.getPartitionPositions(topic);
+            final Map<Integer, Long> seenPartitionPositions = 
position.getPartitionPositions(topic);
+            if (!partitionBounds.containsKey(partition)) {
+                // this topic isn't bounded for our partition, so just skip 
over it.
+            } else {
+                if (!seenPartitionPositions.containsKey(partition)) {
+                    // we haven't seen a partition that we have a bound for
+                    return false;
+                } else if (seenPartitionPositions.get(partition) < 
partitionBounds.get(partition)) {
+                    // our current position is behind the bound
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,

Review Comment:
   should this be `runVersionedKeyQuery` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to