This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-key-query
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bae3a36ab94d1c25ea0adccd21977b9653c30e63
Author: John Roesler <[email protected]>
AuthorDate: Sun Dec 5 15:48:20 2021 -0600

    KAFKA-13525: Implement KeyQuery in Streams IQv2
    
    Implement the KeyQuery and RawKeyQuery as proposed in KIP-796
---
 .../apache/kafka/streams/query/FailureReason.java  |   9 +-
 .../org/apache/kafka/streams/query/KeyQuery.java   |  37 +++++++
 .../apache/kafka/streams/query/QueryResult.java    |  14 ++-
 .../apache/kafka/streams/query/RawKeyQuery.java    |  42 ++++++++
 .../state/internals/MeteredKeyValueStore.java      |  80 +++++++++++++++
 .../streams/state/internals/StoreQueryUtils.java   |  88 +++++++++++++---
 .../integration/IQv2StoreIntegrationTest.java      | 114 +++++++++++++++++++++
 7 files changed, 367 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java 
b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
index 6ec319d..c250f1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -52,5 +52,12 @@ public enum FailureReason {
      * The requested store partition does not exist at all. For example, 
partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query 
execution. The message
+     * will contain the exception details. Depending on the nature of the 
exception, the caller
+     * may be able to retry this instance or may need to try a different 
instance.
+     */
+    STORE_EXCEPTION;
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java 
b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
new file mode 100644
index 0000000..b183929
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery<K, V> implements Query<V> {
+
+    private final K key;
+
+    private KeyQuery(final K key) {
+        this.key = key;
+    }
+
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);
+    }
+
+    public K getKey() {
+        return key;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 780ea86..38bbb2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -29,10 +29,10 @@ import java.util.List;
  */
 public final class QueryResult<R> {
 
-    private final List<String> executionInfo = new LinkedList<>();
     private final FailureReason failureReason;
     private final String failure;
     private final R result;
+    private List<String> executionInfo = new LinkedList<>();
     private Position position;
 
     private QueryResult(final R result) {
@@ -197,6 +197,18 @@ public final class QueryResult<R> {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);
+            result.executionInfo = executionInfo;
+            result.position = position;
+            return result;
+        }
+    }
+
     @Override
     public String toString() {
         return "QueryResult{" +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java 
b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
new file mode 100644
index 0000000..80bd4e3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.common.utils.Bytes;
+
+@Evolving
+public class RawKeyQuery implements Query<byte[]> {
+
+    private final Bytes key;
+
+    private RawKeyQuery(final Bytes key) {
+        this.key = key;
+    }
+
+    public static RawKeyQuery withKey(final Bytes key) {
+        return new RawKeyQuery(key);
+    }
+
+    public static RawKeyQuery withKey(final byte[] key) {
+        return new RawKeyQuery(Bytes.wrap(key));
+    }
+
+    public Bytes getKey() {
+        return key;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 937288c..f382a35 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -34,15 +35,24 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
@@ -79,6 +89,14 @@ public class MeteredKeyValueStore<K, V>
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
+    private  Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> 
runKeyQuery(query, positionBound, collectExecutionInfo)
+            )
+        );
+
     MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
                          final String metricsScope,
                          final Time time,
@@ -186,6 +204,68 @@ public class MeteredKeyValueStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + 
"ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final RawKeyQuery rawKeyQuery = 
RawKeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer 
valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) 
vSerde).deserializer();
+                deserializer = (Deserializer<V>) 
valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }
+            final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 006981e..341f69f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -19,16 +19,73 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
-import java.util.Map.Entry;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    private static Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                PingQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
+            ),
+            mkEntry(RawKeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> {
+                    if (store instanceof KeyValueStore) {
+                        final RawKeyQuery rawKeyQuery = (RawKeyQuery) query;
+                        final KeyValueStore keyValueStore = (KeyValueStore) 
store;
+                        try {
+                            @SuppressWarnings("unchecked") final byte[] bytes =
+                                (byte[]) 
keyValueStore.get(rawKeyQuery.getKey());
+                            return QueryResult.forResult(bytes);
+                        } catch (final Throwable t) {
+                            final StringWriter stringWriter = new 
StringWriter();
+                            final PrintWriter printWriter = new 
PrintWriter(stringWriter);
+                            printWriter.println(
+                                store.getClass() + " failed to handle query " 
+ query + ":");
+                            t.printStackTrace(printWriter);
+                            printWriter.flush();
+                            final String message = stringWriter.toString();
+                            return QueryResult.forFailure(
+                                FailureReason.STORE_EXCEPTION,
+                                message
+                            );
+                        }
+                    } else {
+                        return QueryResult.forUnknownQueryType(query, store);
+                    }
+                })
+        );
+
     // make this class uninstantiable
     private StoreQueryUtils() {
     }
@@ -43,16 +100,21 @@ public final class StoreQueryUtils {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, 
partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
             result = QueryResult.forUnknownQueryType(query, store);
+        } else if (!isPermitted(position, positionBound, partition)) {
+            result = QueryResult.notUpToBound(position, positionBound, 
partition);
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                store
+            );
         }
         if (collectExecutionInfo) {
             result.addExecutionInfo(
@@ -88,14 +150,10 @@ public final class StoreQueryUtils {
                 if (!partitionBounds.containsKey(partition)) {
                     // this topic isn't bounded for our partition, so just 
skip over it.
                 } else {
-                    if (seenPartitionBounds == null) {
-                        // we haven't seen a topic that is bounded for our 
partition
-                        return false;
-                    } else if (!seenPartitionBounds.containsKey(partition)) {
+                    if (!seenPartitionBounds.containsKey(partition)) {
                         // we haven't seen a partition that we have a bound for
                         return false;
-                    } else if (seenPartitionBounds.get(partition) < 
partitionBounds.get(
-                        partition)) {
+                    } else if (seenPartitionBounds.get(partition) < 
partitionBounds.get(partition)) {
                         // our current position is behind the bound
                         return false;
                     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 5a2e67b..97e030f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -48,6 +49,7 @@ import 
org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.PingQuery;
@@ -77,6 +79,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
@@ -122,6 +125,11 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_IN_MEMORY_LRU {
             @Override
@@ -133,6 +141,11 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_ROCKS_KV {
             @Override
@@ -141,9 +154,19 @@ public class IQv2StoreIntegrationTest {
             }
 
             @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_TIME_ROCKS_KV {
             @Override
@@ -155,30 +178,60 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.inMemoryKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_LRU {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.lruMap(STORE_NAME, 100);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         ROCKS_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.persistentKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         TIME_ROCKS_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_WINDOW {
             @Override
@@ -216,9 +269,17 @@ public class IQv2StoreIntegrationTest {
 
         public abstract StoreSupplier<?> supplier();
 
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        };
+
         public boolean global() {
             return false;
         }
+
+        public boolean keyValue() {
+            return false;
+        }
     }
 
     @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
@@ -426,6 +487,22 @@ public class IQv2StoreIntegrationTest {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleKeyQuery(
+                        2,
+                        (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                        2
+                    );
+                } else {
+                    shouldHandleKeyQuery(
+                        2,
+                        Function.identity(),
+                        2
+                    );
+                }
+            }
         }
     }
 
@@ -513,6 +590,43 @@ public class IQv2StoreIntegrationTest {
         assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null
+                ? result.getGlobalResult()
+                : result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class,
+            queryResult::getFailureMessage);
+
+        final V result1 = queryResult.getResult();
+        final Integer integer = valueExtactor.apply(result1);
+        assertThat(integer, is(expectedValue));
+
+        assertThat(queryResult.getExecutionInfo(), is(empty()));
+
+    }
+
     public void shouldCollectExecutionInfo() {
 
         final PingQuery query = new PingQuery();

Reply via email to