This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit c966b29502a73d7f687f57b7eb5257b1643dd2aa Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Oct 12 13:50:28 2020 +0800 [hotfix] [core] Refactor state utility methods into PersistedRemoteFunctionValues This brings state management logic closer to PersistedRemoteFunctionValues, which actual has the responsibility for state management and access. --- .../reqreply/PersistedRemoteFunctionValues.java | 44 ++++++++++++++++++---- .../flink/core/reqreply/RequestReplyFunction.java | 40 +------------------- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java index ed75862..857bcb6 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java @@ -18,15 +18,18 @@ package org.apache.flink.statefun.flink.core.reqreply; +import com.google.protobuf.ByteString; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.BiConsumer; import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec; +import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; +import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; +import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest; import org.apache.flink.statefun.sdk.annotations.Persisted; import org.apache.flink.statefun.sdk.state.Expiration; import org.apache.flink.statefun.sdk.state.PersistedStateRegistry; @@ -51,16 +54,41 @@ public final class PersistedRemoteFunctionValues { stateSpecs.forEach(this::createAndRegisterEagerValueState); } - void forEach(BiConsumer<String, byte[]> consumer) { - managedStates.forEach((stateName, handle) -> consumer.accept(stateName, handle.get())); - } + void attachStateValues(InvocationBatchRequest.Builder batchBuilder) { + for (Map.Entry<String, PersistedValue<byte[]>> managedStateEntry : managedStates.entrySet()) { + final ToFunction.PersistedValue.Builder valueBuilder = + ToFunction.PersistedValue.newBuilder().setStateName(managedStateEntry.getKey()); - void setValue(String stateName, byte[] value) { - getStateHandleOrThrow(stateName).set(value); + final byte[] stateValue = managedStateEntry.getValue().get(); + if (stateValue != null) { + valueBuilder.setStateValue(ByteString.copyFrom(stateValue)); + } + batchBuilder.addState(valueBuilder); + } } - void clearValue(String stateName) { - getStateHandleOrThrow(stateName).clear(); + void updateStateValues(List<PersistedValueMutation> valueMutations) { + for (PersistedValueMutation mutate : valueMutations) { + final String stateName = mutate.getStateName(); + switch (mutate.getMutationType()) { + case DELETE: + { + getStateHandleOrThrow(stateName).clear(); + break; + } + case MODIFY: + { + getStateHandleOrThrow(stateName).set(mutate.getStateValue().toByteArray()); + break; + } + case UNRECOGNIZED: + { + break; + } + default: + throw new IllegalStateException("Unexpected value: " + mutate.getMutationType()); + } + } } /** diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java index 6e4ad78..66f3a24 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java @@ -22,7 +22,6 @@ import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.polyglotA import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.sdkAddressToPolyglotAddress; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -171,7 +170,7 @@ public final class RequestReplyFunction implements StatefulFunction { handleOutgoingMessages(context, invocationResult); handleOutgoingDelayedMessages(context, invocationResult); handleEgressMessages(context, invocationResult); - handleStateMutations(invocationResult); + managedStates.updateStateValues(invocationResult.getStateMutationsList()); } private void handleEgressMessages(Context context, InvocationResponse invocationResult) { @@ -204,41 +203,6 @@ public final class RequestReplyFunction implements StatefulFunction { } // -------------------------------------------------------------------------------- - // State Management - // -------------------------------------------------------------------------------- - - private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) { - managedStates.forEach( - (stateName, stateValue) -> { - ToFunction.PersistedValue.Builder valueBuilder = - ToFunction.PersistedValue.newBuilder().setStateName(stateName); - - if (stateValue != null) { - valueBuilder.setStateValue(ByteString.copyFrom(stateValue)); - } - batchBuilder.addState(valueBuilder); - }); - } - - private void handleStateMutations(InvocationResponse invocationResult) { - for (FromFunction.PersistedValueMutation mutate : invocationResult.getStateMutationsList()) { - final String stateName = mutate.getStateName(); - switch (mutate.getMutationType()) { - case DELETE: - managedStates.clearValue(stateName); - break; - case MODIFY: - managedStates.setValue(stateName, mutate.getStateValue().toByteArray()); - break; - case UNRECOGNIZED: - break; - default: - throw new IllegalStateException("Unexpected value: " + mutate.getMutationType()); - } - } - } - - // -------------------------------------------------------------------------------- // Send Message to Remote Function // -------------------------------------------------------------------------------- /** @@ -267,7 +231,7 @@ public final class RequestReplyFunction implements StatefulFunction { /** Sends a {@link InvocationBatchRequest} to the remote function. */ private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) { batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self())); - addStates(batchBuilder); + managedStates.attachStateValues(batchBuilder); ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build(); sendToFunction(context, toFunction); }
