This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-move-swapresult in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 96453f359d3d3b2416805e75cd0638051e4fc50f Author: John Roesler <[email protected]> AuthorDate: Mon Dec 20 12:43:46 2021 -0600 KAFKA-13557: Remove swapResult from the public API --- .../streams/query/InternalQueryResultUtil.java | 49 ++++++++++++++++++++++ .../apache/kafka/streams/query/QueryResult.java | 24 +++++------ .../state/internals/MeteredKeyValueStore.java | 11 +++-- 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java new file mode 100644 index 0000000..fc81126 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Unstable; + +import java.util.List; + +/** + * This utility class is an internal API, but it must be located in the same package as + * {@link QueryResult} so that it can access the package-private constructor + * {@link QueryResult#QueryResult(Object, List, Position)} + */ +@Unstable +public final class InternalQueryResultUtil { + // uninstantiable utility class + public InternalQueryResultUtil() {} + + public static <R> QueryResult<R> copyAndSubstituteDeserializedResult( + final QueryResult<?> rawResult, + final R deserializedResult) { + + if (rawResult.isFailure()) { + throw new IllegalArgumentException( + "Callers must avoid calling this method on a failed result." + ); + } else { + return new QueryResult<>( + deserializedResult, + rawResult.getExecutionInfo(), + rawResult.getPosition() + ); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java index 1e04e5b..2ffe5b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java @@ -41,6 +41,17 @@ public final class QueryResult<R> { this.failure = null; } + /** + * package-private constructor used in {@link InternalQueryResultUtil}. + */ + QueryResult(final R result, final List<String> executionInfo, final Position position) { + this.result = result; + this.failureReason = null; + this.failure = null; + this.executionInfo = executionInfo; + this.position = position; + } + private QueryResult(final FailureReason failureReason, final String failure) { this.result = null; this.failureReason = failureReason; @@ -197,19 +208,6 @@ public final class QueryResult<R> { return result; } - public <V> QueryResult<V> swapResult(final V value) { - if (isFailure()) { - throw new IllegalArgumentException( - "Callers must avoid calling this method on a failed result." - ); - } else { - final QueryResult<V> result = new QueryResult<>(value); - result.executionInfo = executionInfo; - result.position = position; - return result; - } - } - @Override public String toString() { return "QueryResult{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index b8bacfa..32f4550 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.InternalQueryResultUtil; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -270,9 +271,11 @@ public class MeteredKeyValueStore<K, V> getSensor, getValueDeserializer() ); - final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult( - resultIterator - ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); result = (QueryResult<R>) typedQueryResult; } else { // the generic type doesn't matter, since failed queries have no result set. @@ -296,7 +299,7 @@ public class MeteredKeyValueStore<K, V> final Deserializer<V> deserializer = getValueDeserializer(); final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); final QueryResult<V> typedQueryResult = - rawResult.swapResult(value); + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); result = (QueryResult<R>) typedQueryResult; } else { // the generic type doesn't matter, since failed queries have no result set.
