mjsax commented on code in PR #14596: URL: https://github.com/apache/kafka/pull/14596#discussion_r1367475515
########## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + * <p> + * See KIP-960 for more details. Review Comment: I would remove this. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -139,6 +164,38 @@ public ValueAndTimestamp<V> delete(final K key, final long timestamp) { } } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { Review Comment: nit: fix indention (all parameters should align) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time."); } + @SuppressWarnings("unchecked") @Override protected <R> QueryResult<R> runKeyQuery(final Query<R> query, - final PositionBound positionBound, - final QueryConfig config) { - // throw exception for now to reserve the ability to implement this in the future - // without clashing with users' custom implementations in the meantime - throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); + final PositionBound positionBound, + final QueryConfig config) { + if (query instanceof VersionedKeyQuery) { + final QueryResult<R> result; + final VersionedKeyQuery<K, V> typedKeyQuery = (VersionedKeyQuery<K, V>) query; + VersionedKeyQuery<Bytes, byte[]> rawKeyQuery; + if (typedKeyQuery.asOfTimestamp().isPresent()) { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + rawKeyQuery = rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get()); + } else { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + } + final QueryResult<VersionedRecord<byte[]>> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(plainValueSerdes); Review Comment: Why is the type `ValueAndTimestamp`? Should it not be `VersionedRecord`? ########## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. Review Comment: nit, remove double space ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -89,6 +102,18 @@ private class MeteredVersionedKeyValueStoreInternal private final Serde<V> plainValueSerde; private StateSerdes<K, V> plainValueSerdes; + private final Map<Class, QueryHandler> queryHandlers = + mkMap( + mkEntry( + RangeQuery.class, Review Comment: Do we need to add this one? `handleRangeQuery` is not implemented anyway? ########## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + * <p> + * See KIP-960 for more details. + */ +@Evolving +public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> { + + private final K key; + private final Optional<Instant> asOfTimestamp; + + private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) { + this.key = Objects.requireNonNull(key); + this.asOfTimestamp = asOfTimestamp; + } + + /** + * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + */ + public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) { + return new VersionedKeyQuery<>(key, Optional.empty()); + } + + /** + * Specifies the as of timestamp for the key query. The key query returns the record Review Comment: Should we just say "Specifies the timestamp for the query." ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time."); } + @SuppressWarnings("unchecked") @Override protected <R> QueryResult<R> runKeyQuery(final Query<R> query, - final PositionBound positionBound, - final QueryConfig config) { - // throw exception for now to reserve the ability to implement this in the future - // without clashing with users' custom implementations in the meantime - throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); + final PositionBound positionBound, + final QueryConfig config) { + if (query instanceof VersionedKeyQuery) { + final QueryResult<R> result; + final VersionedKeyQuery<K, V> typedKeyQuery = (VersionedKeyQuery<K, V>) query; + VersionedKeyQuery<Bytes, byte[]> rawKeyQuery; + if (typedKeyQuery.asOfTimestamp().isPresent()) { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + rawKeyQuery = rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get()); + } else { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); Review Comment: Duplicate code -- let move this one before the `if` condition. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time."); } + @SuppressWarnings("unchecked") @Override protected <R> QueryResult<R> runKeyQuery(final Query<R> query, Review Comment: Should we reserve this for `KeyQuery`, and add a new `versionedKeyQuery` method? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext context) { prepareValueSerde(plainValueSerde, new SerdeGetter(context)) ); } + + private byte[] serializeAsBytes(final VersionedRecord<byte[]> versionedRecord) { + if (versionedRecord == null) { + return null; + } + final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE Review Comment: Why do we create a `Serde` -- if we only need the `Serializer` we can just use `new ValueAndTimestampSerializer` ########## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + * <p> + * See KIP-960 for more details. + */ +@Evolving +public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> { + + private final K key; + private final Optional<Instant> asOfTimestamp; + + private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) { + this.key = Objects.requireNonNull(key); + this.asOfTimestamp = asOfTimestamp; + } + + /** + * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + */ + public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) { + return new VersionedKeyQuery<>(key, Optional.empty()); + } + + /** + * Specifies the as of timestamp for the key query. The key query returns the record + * with the greatest timestamp <= asOfTimestamp Review Comment: Should be more more colloquial, and say, "the record version for the specified timestamp." or something like this? `greatest timestamp <= asOfTimestamp` is correct, but kinda hard to reason about -- note: many of our users are not as close to the project, and we should make it easy for the to understand. -- Happy to keep the exact condition as additional information. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -148,13 +205,38 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time."); } + @SuppressWarnings("unchecked") @Override protected <R> QueryResult<R> runKeyQuery(final Query<R> query, - final PositionBound positionBound, - final QueryConfig config) { - // throw exception for now to reserve the ability to implement this in the future - // without clashing with users' custom implementations in the meantime - throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); + final PositionBound positionBound, + final QueryConfig config) { + if (query instanceof VersionedKeyQuery) { + final QueryResult<R> result; + final VersionedKeyQuery<K, V> typedKeyQuery = (VersionedKeyQuery<K, V>) query; + VersionedKeyQuery<Bytes, byte[]> rawKeyQuery; + if (typedKeyQuery.asOfTimestamp().isPresent()) { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + rawKeyQuery = rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get()); + } else { + rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + } + final QueryResult<VersionedRecord<byte[]>> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(plainValueSerdes); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(serializeAsBytes(rawResult.getResult())); + final VersionedRecord<V> versionedRecord = new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp()); + final QueryResult<VersionedRecord<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, versionedRecord); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + // reserved for other IQv2 query types (KIP-968, KIP-969) + return null; Review Comment: Should we throw an exception instead? We should never hit this point anyway, but might be better to have a dedicate exception to guard against bugs? ########## streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. + * <p> + * See KIP-960 for more details. + */ +@Evolving +public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> { + + private final K key; + private final Optional<Instant> asOfTimestamp; + + private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) { + this.key = Objects.requireNonNull(key); + this.asOfTimestamp = asOfTimestamp; + } + + /** + * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + */ + public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) { + return new VersionedKeyQuery<>(key, Optional.empty()); + } + + /** + * Specifies the as of timestamp for the key query. The key query returns the record + * with the greatest timestamp <= asOfTimestamp + * @param asOfTimestamp The as of timestamp for timestamp + * @throws NullPointerException if @param asOfTimestamp is null Review Comment: Should we add the same for `withKey` ? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext context) { prepareValueSerde(plainValueSerde, new SerdeGetter(context)) ); } + + private byte[] serializeAsBytes(final VersionedRecord<byte[]> versionedRecord) { + if (versionedRecord == null) { + return null; + } + final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE + = new ValueAndTimestampSerde<>(new ByteArraySerde()); + final Serializer<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERIALIZER Review Comment: we should use `try-with-resource` for the serializer to avoid warning about "unclosed" resources ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -198,6 +280,19 @@ protected void initStoreSerde(final StateStoreContext context) { prepareValueSerde(plainValueSerde, new SerdeGetter(context)) ); } + + private byte[] serializeAsBytes(final VersionedRecord<byte[]> versionedRecord) { + if (versionedRecord == null) { + return null; + } + final Serde<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERDE + = new ValueAndTimestampSerde<>(new ByteArraySerde()); + final Serializer<ValueAndTimestamp<byte[]>> VALUE_AND_TIMESTAMP_SERIALIZER + = VALUE_AND_TIMESTAMP_SERDE.serializer(); + return VALUE_AND_TIMESTAMP_SERIALIZER.serialize( + null, + ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp())); Review Comment: Similar to above: why do we actually use `ValueAndTimestamp` ? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedStoreQueryUtils.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.temporal.ChronoField; +import java.util.Map; +import java.util.function.Function; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.VersionedKeyQuery; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +public final class VersionedStoreQueryUtils { + + /** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ + @FunctionalInterface + public interface QueryHandler { + QueryResult<?> apply( + final Query<?> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store + ); + } + + @SuppressWarnings("rawtypes") + private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP = + mkMap( + mkEntry( + VersionedKeyQuery.class, + VersionedStoreQueryUtils::runKeyQuery + ) + ); + + // make this class uninstantiable + + private VersionedStoreQueryUtils() { + } + @SuppressWarnings("unchecked") + public static <R> QueryResult<R> handleBasicQueries( + final Query<R> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store, + final Position position, + final StateStoreContext context + ) { + + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; + final QueryResult<R> result; + + final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); + if (handler == null) { + result = QueryResult.forUnknownQueryType(query, store); + } else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) { + result = QueryResult.notUpToBound( + position, + positionBound, + context == null ? null : context.taskId().partition() + ); + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + store + ); + } + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" + ); + } + result.setPosition(position); + return result; + } + + public static boolean isPermitted( + final Position position, + final PositionBound positionBound, + final int partition + ) { + final Position bound = positionBound.position(); + for (final String topic : bound.getTopics()) { + final Map<Integer, Long> partitionBounds = bound.getPartitionPositions(topic); + final Map<Integer, Long> seenPartitionPositions = position.getPartitionPositions(topic); + if (!partitionBounds.containsKey(partition)) { + // this topic isn't bounded for our partition, so just skip over it. + } else { + if (!seenPartitionPositions.containsKey(partition)) { + // we haven't seen a partition that we have a bound for + return false; + } else if (seenPartitionPositions.get(partition) < partitionBounds.get(partition)) { + // our current position is behind the bound + return false; + } + } + } + return true; + } + + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store) { + if (store instanceof VersionedKeyValueStore) { + final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore = + (VersionedKeyValueStore<Bytes, byte[]>) store; + if (query instanceof VersionedKeyQuery) { + final VersionedKeyQuery<Bytes, byte[]> rawKeyQuery = + (VersionedKeyQuery<Bytes, byte[]>) query; + try { + final VersionedRecord<byte[]> bytes; + if (((VersionedKeyQuery<?, ?>) query).asOfTimestamp().isPresent()) { + bytes = versionedKeyValueStore.get(rawKeyQuery.key(), + ((VersionedKeyQuery<?, ?>) query).asOfTimestamp().get() + .getLong(ChronoField.INSTANT_SECONDS)); + } else { + bytes = versionedKeyValueStore.get(rawKeyQuery.key()); + } + return (QueryResult<R>) QueryResult.forResult(bytes); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else { + // Here is preserved for VersionedMultiTimestampedKeyQuery + return QueryResult.forUnknownQueryType(query, store); + } + } else { + return QueryResult.forUnknownQueryType(query, store); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <V> Function<byte[], ValueAndTimestamp<V>> getDeserializeValue( + final StateSerdes<?, V> serdes) { Review Comment: nit: Why is this in it's own line? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedStoreQueryUtils.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.temporal.ChronoField; +import java.util.Map; +import java.util.function.Function; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.VersionedKeyQuery; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +public final class VersionedStoreQueryUtils { + + /** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ + @FunctionalInterface + public interface QueryHandler { + QueryResult<?> apply( + final Query<?> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store + ); + } + + @SuppressWarnings("rawtypes") + private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP = + mkMap( + mkEntry( + VersionedKeyQuery.class, + VersionedStoreQueryUtils::runKeyQuery + ) + ); + + // make this class uninstantiable + + private VersionedStoreQueryUtils() { + } + @SuppressWarnings("unchecked") + public static <R> QueryResult<R> handleBasicQueries( + final Query<R> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store, + final Position position, + final StateStoreContext context + ) { + + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; + final QueryResult<R> result; + + final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); + if (handler == null) { + result = QueryResult.forUnknownQueryType(query, store); + } else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) { + result = QueryResult.notUpToBound( + position, + positionBound, + context == null ? null : context.taskId().partition() + ); + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + store + ); + } + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" + ); + } + result.setPosition(position); + return result; + } + + public static boolean isPermitted( + final Position position, + final PositionBound positionBound, + final int partition + ) { + final Position bound = positionBound.position(); + for (final String topic : bound.getTopics()) { + final Map<Integer, Long> partitionBounds = bound.getPartitionPositions(topic); + final Map<Integer, Long> seenPartitionPositions = position.getPartitionPositions(topic); + if (!partitionBounds.containsKey(partition)) { + // this topic isn't bounded for our partition, so just skip over it. + } else { + if (!seenPartitionPositions.containsKey(partition)) { + // we haven't seen a partition that we have a bound for + return false; + } else if (seenPartitionPositions.get(partition) < partitionBounds.get(partition)) { + // our current position is behind the bound + return false; + } + } + } + return true; + } + + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runKeyQuery(final Query<R> query, Review Comment: should this be `runVersionedKeyQuery` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org