[
https://issues.apache.org/jira/browse/FLINK-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069783#comment-16069783
]
ASF GitHub Bot commented on FLINK-7044:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4225#discussion_r125000171
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
---
@@ -267,6 +293,177 @@ public void shutDown() {
}
/**
+ * Returns a future holding the request result.
+ *
+ * <p>If the server does not serve a KvState instance with the given ID,
+ * the Future will be failed with a {@link UnknownKvStateID}.
+ *
+ * <p>If the KvState instance does not hold any data for the given key
+ * and namespace, the Future will be failed with a {@link
UnknownKeyOrNamespace}.
+ *
+ * <p>All other failures are forwarded to the Future.
+ *
+ * @param jobId JobID of the job the queryable
state belongs to.
+ * @param queryableStateName Name under which the state is
queryable.
+ * @param key The key we are interested
in.
+ * @param keyTypeHint A {@link TypeHint} used
to extract the type of the key.
+ * @param stateDescriptor The {@link
StateDescriptor} of the state we want to query.
+ * @return Future holding the result.
+ */
+ @PublicEvolving
+ public <K, V> Future<V> getKvState(
+ final JobID jobId,
+ final String queryableStateName,
+ final K key,
+ final TypeHint<K> keyTypeHint,
+ final StateDescriptor<?, V> stateDescriptor) {
+
+ Preconditions.checkNotNull(keyTypeHint);
+
+ TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
+ return getKvState(jobId, queryableStateName, key, keyTypeInfo,
stateDescriptor);
+ }
+
+ /**
+ * Returns a future holding the request result.
+ *
+ * <p>If the server does not serve a KvState instance with the given ID,
+ * the Future will be failed with a {@link UnknownKvStateID}.
+ *
+ * <p>If the KvState instance does not hold any data for the given key
+ * and namespace, the Future will be failed with a {@link
UnknownKeyOrNamespace}.
+ *
+ * <p>All other failures are forwarded to the Future.
+ *
+ * @param jobId JobID of the job the queryable
state belongs to.
+ * @param queryableStateName Name under which the state is
queryable.
+ * @param key The key we are interested
in.
+ * @param keyTypeInfo The {@link
TypeInformation} of the key.
+ * @param stateDescriptor The {@link
StateDescriptor} of the state we want to query.
+ * @return Future holding the result.
+ */
+ @PublicEvolving
+ public <K, V> Future<V> getKvState(
+ final JobID jobId,
+ final String queryableStateName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<?, V> stateDescriptor) {
+
+ Preconditions.checkNotNull(keyTypeInfo);
+
+ return getKvState(jobId, queryableStateName, key,
VoidNamespace.INSTANCE,
+ keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE,
stateDescriptor);
+ }
+
+ /**
+ * Returns a future holding the request result.
+ *
+ * <p>If the server does not serve a KvState instance with the given ID,
+ * the Future will be failed with a {@link UnknownKvStateID}.
+ *
+ * <p>If the KvState instance does not hold any data for the given key
+ * and namespace, the Future will be failed with a {@link
UnknownKeyOrNamespace}.
+ *
+ * <p>All other failures are forwarded to the Future.
+ *
+ * @param jobId JobID of the job the queryable
state belongs to.
+ * @param queryableStateName Name under which the state is
queryable.
+ * @param key The key that the state we
request is associated with.
+ * @param namespace The namespace
of the state.
+ * @param keyTypeInfo The {@link
TypeInformation} of the keys.
+ * @param namespaceTypeInfo The {@link
TypeInformation} of the namespace.
+ * @param stateDescriptor The {@link
StateDescriptor} of the state we want to query.
+ * @return Future holding the result.
+ */
+ @PublicEvolving
+ public <K, V, N> Future<V> getKvState(
+ final JobID jobId,
+ final String queryableStateName,
+ final K key,
+ final N namespace,
+ final TypeInformation<K> keyTypeInfo,
+ final TypeInformation<N> namespaceTypeInfo,
+ final StateDescriptor<?, V> stateDescriptor) {
+
+ Preconditions.checkNotNull(stateDescriptor);
+
+ // initialize the value serializer based on the execution
config.
+ stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+ TypeSerializer<V> valueSerializer =
stateDescriptor.getSerializer();
+
+ return getKvState(jobId, queryableStateName, key,
+ namespace, keyTypeInfo, namespaceTypeInfo,
valueSerializer);
+ }
+
+ /**
+ * Returns a future holding the request result.
+ *
+ * <p>If the server does not serve a KvState instance with the given ID,
+ * the Future will be failed with a {@link UnknownKvStateID}.
+ *
+ * <p>If the KvState instance does not hold any data for the given key
+ * and namespace, the Future will be failed with a {@link
UnknownKeyOrNamespace}.
+ *
+ * <p>All other failures are forwarded to the Future.
+ *
+ * @param jobId JobID of the job the queryable
state belongs to.
+ * @param queryableStateName Name under which the state is
queryable.
+ * @param key The key that the state we
request is associated with.
+ * @param namespace The namespace
of the state.
+ * @param keyTypeInfo The {@link
TypeInformation} of the keys.
+ * @param namespaceTypeInfo The {@link
TypeInformation} of the namespace.
+ * @param valueSerializer The {@link
TypeSerializer} of the state we want to query.
+ * @return Future holding the result.
+ */
+ @PublicEvolving
+ public <K, V, N> Future<V> getKvState(
+ final JobID jobId,
+ final String queryableStateName,
+ final K key,
+ final N namespace,
+ final TypeInformation<K> keyTypeInfo,
+ final TypeInformation<N> namespaceTypeInfo,
+ final TypeSerializer<V> valueSerializer) {
--- End diff --
I think this could be `stateSerializer`, since it's not restricted to value
states, having "value" in there might confuse some people when looking at the
code.
> Add methods to the client API that take the stateDescriptor.
> ------------------------------------------------------------
>
> Key: FLINK-7044
> URL: https://issues.apache.org/jira/browse/FLINK-7044
> Project: Flink
> Issue Type: Improvement
> Components: Queryable State
> Affects Versions: 1.3.0, 1.3.1
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)