This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c966b29502a73d7f687f57b7eb5257b1643dd2aa
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Oct 12 13:50:28 2020 +0800

    [hotfix] [core] Refactor state utility methods into 
PersistedRemoteFunctionValues
    
    This brings state management logic closer to
    PersistedRemoteFunctionValues, which actual has the responsibility for
    state management and access.
---
 .../reqreply/PersistedRemoteFunctionValues.java    | 44 ++++++++++++++++++----
 .../flink/core/reqreply/RequestReplyFunction.java  | 40 +-------------------
 2 files changed, 38 insertions(+), 46 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index ed75862..857bcb6 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.statefun.flink.core.reqreply;
 
+import com.google.protobuf.ByteString;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.BiConsumer;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import 
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
+import 
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
 import 
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
+import 
org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
@@ -51,16 +54,41 @@ public final class PersistedRemoteFunctionValues {
     stateSpecs.forEach(this::createAndRegisterEagerValueState);
   }
 
-  void forEach(BiConsumer<String, byte[]> consumer) {
-    managedStates.forEach((stateName, handle) -> consumer.accept(stateName, 
handle.get()));
-  }
+  void attachStateValues(InvocationBatchRequest.Builder batchBuilder) {
+    for (Map.Entry<String, PersistedValue<byte[]>> managedStateEntry : 
managedStates.entrySet()) {
+      final ToFunction.PersistedValue.Builder valueBuilder =
+          
ToFunction.PersistedValue.newBuilder().setStateName(managedStateEntry.getKey());
 
-  void setValue(String stateName, byte[] value) {
-    getStateHandleOrThrow(stateName).set(value);
+      final byte[] stateValue = managedStateEntry.getValue().get();
+      if (stateValue != null) {
+        valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
+      }
+      batchBuilder.addState(valueBuilder);
+    }
   }
 
-  void clearValue(String stateName) {
-    getStateHandleOrThrow(stateName).clear();
+  void updateStateValues(List<PersistedValueMutation> valueMutations) {
+    for (PersistedValueMutation mutate : valueMutations) {
+      final String stateName = mutate.getStateName();
+      switch (mutate.getMutationType()) {
+        case DELETE:
+          {
+            getStateHandleOrThrow(stateName).clear();
+            break;
+          }
+        case MODIFY:
+          {
+            
getStateHandleOrThrow(stateName).set(mutate.getStateValue().toByteArray());
+            break;
+          }
+        case UNRECOGNIZED:
+          {
+            break;
+          }
+        default:
+          throw new IllegalStateException("Unexpected value: " + 
mutate.getMutationType());
+      }
+    }
   }
 
   /**
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 6e4ad78..66f3a24 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -22,7 +22,6 @@ import static 
org.apache.flink.statefun.flink.core.common.PolyglotUtil.polyglotA
 import static 
org.apache.flink.statefun.flink.core.common.PolyglotUtil.sdkAddressToPolyglotAddress;
 
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -171,7 +170,7 @@ public final class RequestReplyFunction implements 
StatefulFunction {
     handleOutgoingMessages(context, invocationResult);
     handleOutgoingDelayedMessages(context, invocationResult);
     handleEgressMessages(context, invocationResult);
-    handleStateMutations(invocationResult);
+    managedStates.updateStateValues(invocationResult.getStateMutationsList());
   }
 
   private void handleEgressMessages(Context context, InvocationResponse 
invocationResult) {
@@ -204,41 +203,6 @@ public final class RequestReplyFunction implements 
StatefulFunction {
   }
 
   // 
--------------------------------------------------------------------------------
-  // State Management
-  // 
--------------------------------------------------------------------------------
-
-  private void addStates(ToFunction.InvocationBatchRequest.Builder 
batchBuilder) {
-    managedStates.forEach(
-        (stateName, stateValue) -> {
-          ToFunction.PersistedValue.Builder valueBuilder =
-              ToFunction.PersistedValue.newBuilder().setStateName(stateName);
-
-          if (stateValue != null) {
-            valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
-          }
-          batchBuilder.addState(valueBuilder);
-        });
-  }
-
-  private void handleStateMutations(InvocationResponse invocationResult) {
-    for (FromFunction.PersistedValueMutation mutate : 
invocationResult.getStateMutationsList()) {
-      final String stateName = mutate.getStateName();
-      switch (mutate.getMutationType()) {
-        case DELETE:
-          managedStates.clearValue(stateName);
-          break;
-        case MODIFY:
-          managedStates.setValue(stateName, 
mutate.getStateValue().toByteArray());
-          break;
-        case UNRECOGNIZED:
-          break;
-        default:
-          throw new IllegalStateException("Unexpected value: " + 
mutate.getMutationType());
-      }
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------
   // Send Message to Remote Function
   // 
--------------------------------------------------------------------------------
   /**
@@ -267,7 +231,7 @@ public final class RequestReplyFunction implements 
StatefulFunction {
   /** Sends a {@link InvocationBatchRequest} to the remote function. */
   private void sendToFunction(Context context, InvocationBatchRequest.Builder 
batchBuilder) {
     batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self()));
-    addStates(batchBuilder);
+    managedStates.attachStateValues(batchBuilder);
     ToFunction toFunction = 
ToFunction.newBuilder().setInvocation(batchBuilder).build();
     sendToFunction(context, toFunction);
   }

Reply via email to