This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 62cf2e406c23fd623cbe082229bd182c3293297f
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Sun Oct 11 13:06:54 2020 +0800

    [FLINK-20265] [core] Allow PersistedRemoteFunctionValues to register states 
based on protocol PersistedValueSpecs
---
 .../reqreply/PersistedRemoteFunctionValues.java    | 51 +++++++++++++++++++++-
 1 file changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index 4733366..4977e2e 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.statefun.flink.core.reqreply;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import 
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
+import 
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -52,6 +56,49 @@ public final class PersistedRemoteFunctionValues {
     getStateHandleOrThrow(stateName).clear();
   }
 
+  /**
+   * Registers states that were indicated to be missing by remote functions 
via the remote
+   * invocation protocol.
+   *
+   * <p>A state is registered with the provided specification only if it 
wasn't registered already
+   * under the same name (identified by {@link 
PersistedValueSpec#getStateName()}). This means that
+   * you cannot change the specifications of an already registered state name, 
e.g. state TTL
+   * expiration configuration cannot be changed.
+   *
+   * @param protocolPersistedValueSpecs list of specifications for the 
indicated missing states.
+   */
+  void registerStates(List<PersistedValueSpec> protocolPersistedValueSpecs) {
+    
protocolPersistedValueSpecs.forEach(this::createAndRegisterValueStateIfAbsent);
+  }
+
+  private void createAndRegisterValueStateIfAbsent(PersistedValueSpec 
protocolPersistedValueSpec) {
+    final String stateName = protocolPersistedValueSpec.getStateName();
+
+    if (!managedStates.containsKey(stateName)) {
+      final PersistedValue<byte[]> stateValue =
+          PersistedValue.of(
+              stateName,
+              byte[].class,
+              
sdkTtlExpiration(protocolPersistedValueSpec.getExpirationSpec()));
+      stateRegistry.registerValue(stateValue);
+      managedStates.put(stateName, stateValue);
+    }
+  }
+
+  private static Expiration sdkTtlExpiration(ExpirationSpec 
protocolExpirationSpec) {
+    final long expirationTtlMillis = 
protocolExpirationSpec.getExpireAfterMillis();
+
+    switch (protocolExpirationSpec.getMode()) {
+      case AFTER_INVOKE:
+        return 
Expiration.expireAfterReadingOrWriting(Duration.ofMillis(expirationTtlMillis));
+      case AFTER_WRITE:
+        return 
Expiration.expireAfterWriting(Duration.ofMillis(expirationTtlMillis));
+      default:
+      case NONE:
+        return Expiration.none();
+    }
+  }
+
   private void createAndRegisterValueState(StateSpec stateSpec) {
     final String stateName = stateSpec.name();
 
@@ -65,7 +112,9 @@ public final class PersistedRemoteFunctionValues {
     final PersistedValue<byte[]> handle = managedStates.get(stateName);
     if (handle == null) {
       throw new IllegalStateException(
-          "Accessing a non-existing remote function state: " + stateName);
+          "Accessing a non-existing function state: "
+              + stateName
+              + ". This can happen if you forgot to declare this state using 
the language SDKs.");
     }
     return handle;
   }

Reply via email to