This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-key-query in repository https://gitbox.apache.org/repos/asf/kafka.git
commit bae3a36ab94d1c25ea0adccd21977b9653c30e63 Author: John Roesler <[email protected]> AuthorDate: Sun Dec 5 15:48:20 2021 -0600 KAFKA-13525: Implement KeyQuery in Streams IQv2 Implement the KeyQuery and RawKeyQuery as proposed in KIP-796 --- .../apache/kafka/streams/query/FailureReason.java | 9 +- .../org/apache/kafka/streams/query/KeyQuery.java | 37 +++++++ .../apache/kafka/streams/query/QueryResult.java | 14 ++- .../apache/kafka/streams/query/RawKeyQuery.java | 42 ++++++++ .../state/internals/MeteredKeyValueStore.java | 80 +++++++++++++++ .../streams/state/internals/StoreQueryUtils.java | 88 +++++++++++++--- .../integration/IQv2StoreIntegrationTest.java | 114 +++++++++++++++++++++ 7 files changed, 367 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java index 6ec319d..c250f1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java @@ -52,5 +52,12 @@ public enum FailureReason { * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ - DOES_NOT_EXIST; + DOES_NOT_EXIST, + + /** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ + STORE_EXCEPTION; } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java new file mode 100644 index 0000000..b183929 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +@Evolving +public class KeyQuery<K, V> implements Query<V> { + + private final K key; + + private KeyQuery(final K key) { + this.key = key; + } + + public static <K, V> KeyQuery<K, V> withKey(final K key) { + return new KeyQuery<>(key); + } + + public K getKey() { + return key; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java index 780ea86..38bbb2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java @@ -29,10 +29,10 @@ import java.util.List; */ public final class QueryResult<R> { - private final List<String> executionInfo = new LinkedList<>(); private final FailureReason failureReason; private final String failure; private final R result; + private List<String> executionInfo = new LinkedList<>(); private Position position; private QueryResult(final R result) { @@ -197,6 +197,18 @@ public final class QueryResult<R> { return result; } + @SuppressWarnings("unchecked") + public <V> QueryResult<V> swapResult(final V value) { + if (isFailure()) { + return (QueryResult<V>) this; + } else { + final QueryResult<V> result = new QueryResult<>(value); + result.executionInfo = executionInfo; + result.position = position; + return result; + } + } + @Override public String toString() { return "QueryResult{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java new file mode 100644 index 0000000..80bd4e3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.common.utils.Bytes; + +@Evolving +public class RawKeyQuery implements Query<byte[]> { + + private final Bytes key; + + private RawKeyQuery(final Bytes key) { + this.key = key; + } + + public static RawKeyQuery withKey(final Bytes key) { + return new RawKeyQuery(key); + } + + public static RawKeyQuery withKey(final byte[] key) { + return new RawKeyQuery(Bytes.wrap(key)); + } + + public Bytes getKey() { + return key; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 937288c..f382a35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -34,15 +35,24 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler; import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -79,6 +89,14 @@ public class MeteredKeyValueStore<K, V> private StreamsMetricsImpl streamsMetrics; private TaskId taskId; + private Map<Class, QueryHandler> queryHandlers = + mkMap( + mkEntry( + KeyQuery.class, + (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) + ) + ); + MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner, final String metricsScope, final Time time, @@ -186,6 +204,68 @@ public class MeteredKeyValueStore<K, V> return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result; + + final QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + collectExecutionInfo, + this + ); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (System.nanoTime() - start) + "ns"); + } + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query query, + final PositionBound positionBound, final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query; + final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); + final Serde<V> vSerde = serdes.valueSerde(); + final Deserializer<V> deserializer; + if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { + final ValueAndTimestampDeserializer valueAndTimestampDeserializer = + (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); + deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer; + } else { + deserializer = vSerde.deserializer(); + } + final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); + final QueryResult<V> typedQueryResult = + rawResult.swapResult(value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + @Override public V get(final K key) { Objects.requireNonNull(key, "key cannot be null"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index 006981e..341f69f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -19,16 +19,73 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; -import java.util.Map.Entry; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; public final class StoreQueryUtils { + /** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ + @FunctionalInterface + public interface QueryHandler { + QueryResult<?> apply( + final Query<?> query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store + ); + } + + + private static Map<Class, QueryHandler> queryHandlers = + mkMap( + mkEntry( + PingQuery.class, + (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) + ), + mkEntry(RawKeyQuery.class, + (query, positionBound, collectExecutionInfo, store) -> { + if (store instanceof KeyValueStore) { + final RawKeyQuery rawKeyQuery = (RawKeyQuery) query; + final KeyValueStore keyValueStore = (KeyValueStore) store; + try { + @SuppressWarnings("unchecked") final byte[] bytes = + (byte[]) keyValueStore.get(rawKeyQuery.getKey()); + return QueryResult.forResult(bytes); + } catch (final Throwable t) { + final StringWriter stringWriter = new StringWriter(); + final PrintWriter printWriter = new PrintWriter(stringWriter); + printWriter.println( + store.getClass() + " failed to handle query " + query + ":"); + t.printStackTrace(printWriter); + printWriter.flush(); + final String message = stringWriter.toString(); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else { + return QueryResult.forUnknownQueryType(query, store); + } + }) + ); + // make this class uninstantiable private StoreQueryUtils() { } @@ -43,16 +100,21 @@ public final class StoreQueryUtils { final int partition ) { - final QueryResult<R> result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; - if (query instanceof PingQuery) { - if (!isPermitted(position, positionBound, partition)) { - result = QueryResult.notUpToBound(position, positionBound, partition); - } else { - result = (QueryResult<R>) QueryResult.forResult(true); - } - } else { + final QueryResult<R> result; + + final QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { result = QueryResult.forUnknownQueryType(query, store); + } else if (!isPermitted(position, positionBound, partition)) { + result = QueryResult.notUpToBound(position, positionBound, partition); + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + collectExecutionInfo, + store + ); } if (collectExecutionInfo) { result.addExecutionInfo( @@ -88,14 +150,10 @@ public final class StoreQueryUtils { if (!partitionBounds.containsKey(partition)) { // this topic isn't bounded for our partition, so just skip over it. } else { - if (seenPartitionBounds == null) { - // we haven't seen a topic that is bounded for our partition - return false; - } else if (!seenPartitionBounds.containsKey(partition)) { + if (!seenPartitionBounds.containsKey(partition)) { // we haven't seen a partition that we have a bound for return false; - } else if (seenPartitionBounds.get(partition) < partitionBounds.get( - partition)) { + } else if (seenPartitionBounds.get(partition) < partitionBounds.get(partition)) { // our current position is behind the bound return false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 5a2e67b..97e030f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -48,6 +49,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.PingQuery; @@ -77,6 +79,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; @@ -122,6 +125,11 @@ public class IQv2StoreIntegrationTest { public boolean global() { return true; } + + @Override + public boolean keyValue() { + return true; + } }, GLOBAL_IN_MEMORY_LRU { @Override @@ -133,6 +141,11 @@ public class IQv2StoreIntegrationTest { public boolean global() { return true; } + + @Override + public boolean keyValue() { + return true; + } }, GLOBAL_ROCKS_KV { @Override @@ -141,9 +154,19 @@ public class IQv2StoreIntegrationTest { } @Override + public boolean timestamped() { + return false; + } + + @Override public boolean global() { return true; } + + @Override + public boolean keyValue() { + return true; + } }, GLOBAL_TIME_ROCKS_KV { @Override @@ -155,30 +178,60 @@ public class IQv2StoreIntegrationTest { public boolean global() { return true; } + + @Override + public boolean keyValue() { + return true; + } }, IN_MEMORY_KV { @Override public StoreSupplier<?> supplier() { return Stores.inMemoryKeyValueStore(STORE_NAME); } + + @Override + public boolean keyValue() { + return true; + } }, IN_MEMORY_LRU { @Override public StoreSupplier<?> supplier() { return Stores.lruMap(STORE_NAME, 100); } + + @Override + public boolean keyValue() { + return true; + } }, ROCKS_KV { @Override public StoreSupplier<?> supplier() { return Stores.persistentKeyValueStore(STORE_NAME); } + + @Override + public boolean timestamped() { + return false; + } + + @Override + public boolean keyValue() { + return true; + } }, TIME_ROCKS_KV { @Override public StoreSupplier<?> supplier() { return Stores.persistentTimestampedKeyValueStore(STORE_NAME); } + + @Override + public boolean keyValue() { + return true; + } }, IN_MEMORY_WINDOW { @Override @@ -216,9 +269,17 @@ public class IQv2StoreIntegrationTest { public abstract StoreSupplier<?> supplier(); + public boolean timestamped() { + return true; // most stores are timestamped + }; + public boolean global() { return false; } + + public boolean keyValue() { + return false; + } } @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}") @@ -426,6 +487,22 @@ public class IQv2StoreIntegrationTest { shouldHandlePingQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); + + if (storeToTest.keyValue()) { + if (storeToTest.timestamped()) { + shouldHandleKeyQuery( + 2, + (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value, + 2 + ); + } else { + shouldHandleKeyQuery( + 2, + Function.identity(), + 2 + ); + } + } } } @@ -513,6 +590,43 @@ public class IQv2StoreIntegrationTest { assertThat(result.getPosition(), is(INPUT_POSITION)); } + public <V> void shouldHandleKeyQuery( + final Integer key, + final Function<V, Integer> valueExtactor, + final Integer expectedValue) { + + final KeyQuery<Integer, V> query = KeyQuery.withKey(key); + final StateQueryRequest<V> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult<V> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + final QueryResult<V> queryResult = + result.getGlobalResult() != null + ? result.getGlobalResult() + : result.getOnlyPartitionResult(); + final boolean failure = queryResult.isFailure(); + if (failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, + queryResult::getFailureMessage); + + final V result1 = queryResult.getResult(); + final Integer integer = valueExtactor.apply(result1); + assertThat(integer, is(expectedValue)); + + assertThat(queryResult.getExecutionInfo(), is(empty())); + + } + public void shouldCollectExecutionInfo() { final PingQuery query = new PingQuery();
