This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-move-swapresult
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 96453f359d3d3b2416805e75cd0638051e4fc50f
Author: John Roesler <[email protected]>
AuthorDate: Mon Dec 20 12:43:46 2021 -0600

    KAFKA-13557: Remove swapResult from the public API
---
 .../streams/query/InternalQueryResultUtil.java     | 49 ++++++++++++++++++++++
 .../apache/kafka/streams/query/QueryResult.java    | 24 +++++------
 .../state/internals/MeteredKeyValueStore.java      | 11 +++--
 3 files changed, 67 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
new file mode 100644
index 0000000..fc81126
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same 
package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(
+        final QueryResult<?> rawResult,
+        final R deserializedResult) {
+
+        if (rawResult.isFailure()) {
+            throw new IllegalArgumentException(
+                "Callers must avoid calling this method on a failed result."
+            );
+        } else {
+            return new QueryResult<>(
+                deserializedResult,
+                rawResult.getExecutionInfo(),
+                rawResult.getPosition()
+            );
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 1e04e5b..2ffe5b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -41,6 +41,17 @@ public final class QueryResult<R> {
         this.failure = null;
     }
 
+    /**
+     * package-private constructor used in {@link InternalQueryResultUtil}.
+     */
+    QueryResult(final R result, final List<String> executionInfo, final 
Position position) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+        this.executionInfo = executionInfo;
+        this.position = position;
+    }
+
     private QueryResult(final FailureReason failureReason, final String 
failure) {
         this.result = null;
         this.failureReason = failureReason;
@@ -197,19 +208,6 @@ public final class QueryResult<R> {
         return result;
     }
 
-    public <V> QueryResult<V> swapResult(final V value) {
-        if (isFailure()) {
-            throw new IllegalArgumentException(
-                "Callers must avoid calling this method on a failed result."
-            );
-        } else {
-            final QueryResult<V> result = new QueryResult<>(value);
-            result.executionInfo = executionInfo;
-            result.position = position;
-            return result;
-        }
-    }
-
     @Override
     public String toString() {
         return "QueryResult{" +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index b8bacfa..32f4550 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -35,6 +35,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InternalQueryResultUtil;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -270,9 +271,11 @@ public class MeteredKeyValueStore<K, V>
                 getSensor,
                 getValueDeserializer()
             );
-            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = 
rawResult.swapResult(
-                resultIterator
-            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                    rawResult,
+                    resultIterator
+                );
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -296,7 +299,7 @@ public class MeteredKeyValueStore<K, V>
             final Deserializer<V> deserializer = getValueDeserializer();
             final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
             final QueryResult<V> typedQueryResult =
-                rawResult.swapResult(value);
+                
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.

Reply via email to