This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 62cf2e406c23fd623cbe082229bd182c3293297f Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Sun Oct 11 13:06:54 2020 +0800 [FLINK-20265] [core] Allow PersistedRemoteFunctionValues to register states based on protocol PersistedValueSpecs --- .../reqreply/PersistedRemoteFunctionValues.java | 51 +++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java index 4733366..4977e2e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java @@ -18,13 +18,17 @@ package org.apache.flink.statefun.flink.core.reqreply; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; import org.apache.flink.statefun.flink.core.httpfn.StateSpec; +import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec; +import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; import org.apache.flink.statefun.sdk.annotations.Persisted; +import org.apache.flink.statefun.sdk.state.Expiration; import org.apache.flink.statefun.sdk.state.PersistedStateRegistry; import org.apache.flink.statefun.sdk.state.PersistedValue; @@ -52,6 +56,49 @@ public final class PersistedRemoteFunctionValues { getStateHandleOrThrow(stateName).clear(); } + /** + * Registers states that were indicated to be missing by remote functions via the remote + * invocation protocol. + * + * <p>A state is registered with the provided specification only if it wasn't registered already + * under the same name (identified by {@link PersistedValueSpec#getStateName()}). This means that + * you cannot change the specifications of an already registered state name, e.g. state TTL + * expiration configuration cannot be changed. + * + * @param protocolPersistedValueSpecs list of specifications for the indicated missing states. + */ + void registerStates(List<PersistedValueSpec> protocolPersistedValueSpecs) { + protocolPersistedValueSpecs.forEach(this::createAndRegisterValueStateIfAbsent); + } + + private void createAndRegisterValueStateIfAbsent(PersistedValueSpec protocolPersistedValueSpec) { + final String stateName = protocolPersistedValueSpec.getStateName(); + + if (!managedStates.containsKey(stateName)) { + final PersistedValue<byte[]> stateValue = + PersistedValue.of( + stateName, + byte[].class, + sdkTtlExpiration(protocolPersistedValueSpec.getExpirationSpec())); + stateRegistry.registerValue(stateValue); + managedStates.put(stateName, stateValue); + } + } + + private static Expiration sdkTtlExpiration(ExpirationSpec protocolExpirationSpec) { + final long expirationTtlMillis = protocolExpirationSpec.getExpireAfterMillis(); + + switch (protocolExpirationSpec.getMode()) { + case AFTER_INVOKE: + return Expiration.expireAfterReadingOrWriting(Duration.ofMillis(expirationTtlMillis)); + case AFTER_WRITE: + return Expiration.expireAfterWriting(Duration.ofMillis(expirationTtlMillis)); + default: + case NONE: + return Expiration.none(); + } + } + private void createAndRegisterValueState(StateSpec stateSpec) { final String stateName = stateSpec.name(); @@ -65,7 +112,9 @@ public final class PersistedRemoteFunctionValues { final PersistedValue<byte[]> handle = managedStates.get(stateName); if (handle == null) { throw new IllegalStateException( - "Accessing a non-existing remote function state: " + stateName); + "Accessing a non-existing function state: " + + stateName + + ". This can happen if you forgot to declare this state using the language SDKs."); } return handle; }
