igalshilman commented on a change in pull request #126:
URL: https://github.com/apache/flink-statefun/pull/126#discussion_r441608182



##########
File path: 
statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+
+/**
+ * A {@link PersistedStateRegistry} can be used to register persisted state, 
such as a {@link
+ * PersistedValue} or {@link PersistedTable}, etc. All state that is 
registered via this registry is
+ * persisted and maintained by the system for fault-tolerance.
+ *
+ * <p>Created state registries must be bound to the system by using the {@link 
Persisted}
+ * annotation. Please see the class-level Javadoc of {@link StatefulFunction} 
for an example on how
+ * to do that.
+ *
+ * @see StatefulFunction
+ */
+public final class PersistedStateRegistry {
+
+  private final Map<String, Object> registeredStates = new HashMap<>();
+
+  private StateBinder stateBinder;
+
+  /**
+   * The type of the function that this registry is bound to. This is {@code 
NULL} if this registry
+   * is not bounded.
+   */
+  @Nullable private FunctionType functionType;
+
+  public PersistedStateRegistry() {
+    this.stateBinder = new NonFaultTolerantStateBinder();
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of 
the values. If a
+   * registered value already exists for the given name, the previous 
persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type) {
+    return registerValue(name, type, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of 
the values. If a
+   * registered value already exists for the given name, the previous 
persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param expiration expiration configuration for the registered state.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type, 
Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedValue.class, name, stateName -> createValue(stateName, type, 
expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of 
the keys and values of
+   * the table. If a registered value already exists for the given name, the 
previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType) {
+    return registerTable(name, keyType, valueType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of 
the keys and values of
+   * the table. If a registered value already exists for the given name, the 
previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param expiration expiration configuration for the registered state.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType, Expiration 
expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedTable.class,
+        name,
+        stateName -> createTable(stateName, keyType, valueType, expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the 
type of the buffer
+   * elements. If a registered buffer already exists for the given name, the 
previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a 
registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType) {
+    return registerAppendingBuffer(name, elementType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the 
type of the buffer
+   * elements. If a registered buffer already exists for the given name, the 
previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param expiration expiration configuration for the registered state.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a 
registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType, Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedAppendingBuffer.class,
+        name,
+        stateName -> createAppendingBuffer(stateName, elementType, 
expiration));
+  }
+
+  /**
+   * Binds this state registry to a given function. All existing registered 
state in this registry
+   * will also be bound to the system.
+   *
+   * @param stateBinder the new fault-tolerant state binder to use.
+   * @param functionType the type of the function that this registry is being 
bound to.
+   * @throws IllegalStateException if the registry was attempted to be bound 
more than once.
+   */
+  @ForRuntime
+  void bind(StateBinder stateBinder, FunctionType functionType) {
+    if (this.functionType != null) {
+      throw new IllegalStateException(
+          "This registry was already bound to function type: "
+              + this.functionType
+              + ", attempting to rebind to function type: "
+              + functionType);
+    }
+
+    this.stateBinder = Objects.requireNonNull(stateBinder);
+    this.functionType = Objects.requireNonNull(functionType);
+
+    registeredStates.values().forEach(state -> stateBinder.bind(state, 
functionType));
+  }
+
+  private <T> PersistedValue<T> createValue(String name, Class<T> type, 
Expiration expiration) {

Review comment:
       How do you feel about: instead of having separate constructor methods 
for createValue, createTable etc'
   we accept an already constructed persisted object.
   For example:
   instead:
   ```
   public <K, V> PersistedTable<K, V> registerTable(
          String name, Class<K> keyType, Class<V> valueType, Expiration 
expiration)
    ```
   
   we will do:
   
   ```public <K,V> PersistedTable<K, V> registerTable(PersistedTable<K, V> 
table) { .. }```
   
   The advantage here is that we don't have to keep the constructor methods 
synchronized. 

##########
File path: 
statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+
+/**
+ * A {@link PersistedStateRegistry} can be used to register persisted state, 
such as a {@link
+ * PersistedValue} or {@link PersistedTable}, etc. All state that is 
registered via this registry is
+ * persisted and maintained by the system for fault-tolerance.
+ *
+ * <p>Created state registries must be bound to the system by using the {@link 
Persisted}
+ * annotation. Please see the class-level Javadoc of {@link StatefulFunction} 
for an example on how
+ * to do that.
+ *
+ * @see StatefulFunction
+ */
+public final class PersistedStateRegistry {
+
+  private final Map<String, Object> registeredStates = new HashMap<>();
+
+  private StateBinder stateBinder;
+
+  /**
+   * The type of the function that this registry is bound to. This is {@code 
NULL} if this registry
+   * is not bounded.
+   */
+  @Nullable private FunctionType functionType;
+
+  public PersistedStateRegistry() {
+    this.stateBinder = new NonFaultTolerantStateBinder();
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of 
the values. If a
+   * registered value already exists for the given name, the previous 
persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type) {
+    return registerValue(name, type, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of 
the values. If a
+   * registered value already exists for the given name, the previous 
persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param expiration expiration configuration for the registered state.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type, 
Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedValue.class, name, stateName -> createValue(stateName, type, 
expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of 
the keys and values of
+   * the table. If a registered value already exists for the given name, the 
previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType) {
+    return registerTable(name, keyType, valueType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of 
the keys and values of
+   * the table. If a registered value already exists for the given name, the 
previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param expiration expiration configuration for the registered state.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a 
registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType, Expiration 
expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedTable.class,
+        name,
+        stateName -> createTable(stateName, keyType, valueType, expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the 
type of the buffer
+   * elements. If a registered buffer already exists for the given name, the 
previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a 
registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType) {
+    return registerAppendingBuffer(name, elementType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the 
type of the buffer
+   * elements. If a registered buffer already exists for the given name, the 
previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param expiration expiration configuration for the registered state.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a 
registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the 
given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType, Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedAppendingBuffer.class,
+        name,
+        stateName -> createAppendingBuffer(stateName, elementType, 
expiration));
+  }
+
+  /**
+   * Binds this state registry to a given function. All existing registered 
state in this registry
+   * will also be bound to the system.
+   *
+   * @param stateBinder the new fault-tolerant state binder to use.
+   * @param functionType the type of the function that this registry is being 
bound to.
+   * @throws IllegalStateException if the registry was attempted to be bound 
more than once.
+   */
+  @ForRuntime
+  void bind(StateBinder stateBinder, FunctionType functionType) {
+    if (this.functionType != null) {
+      throw new IllegalStateException(
+          "This registry was already bound to function type: "
+              + this.functionType
+              + ", attempting to rebind to function type: "
+              + functionType);
+    }
+
+    this.stateBinder = Objects.requireNonNull(stateBinder);
+    this.functionType = Objects.requireNonNull(functionType);
+
+    registeredStates.values().forEach(state -> stateBinder.bind(state, 
functionType));

Review comment:
       @tzulitai I think that the `StateBinder` should already know the 
`functionType` when calling `stateBinder.bind()`
   

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
##########
@@ -228,27 +204,27 @@ private void handleOutgoingDelayedMessages(Context 
context, InvocationResponse i
   // 
--------------------------------------------------------------------------------
 
   private void addStates(ToFunction.InvocationBatchRequest.Builder 
batchBuilder) {
-    for (StateSpec stateSpec : registeredStates) {
-      ToFunction.PersistedValue.Builder valueBuilder =
-          
ToFunction.PersistedValue.newBuilder().setStateName(stateSpec.name());
-
-      byte[] stateValue = managedStates.get(stateSpec.name());
-      if (stateValue != null) {
-        valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
-      }
-      batchBuilder.addState(valueBuilder);
-    }
+    managedStates.forEach(

Review comment:
       I think that mangedState should have a `populateBatch` method,
   to avoid the forEach allocation.

##########
File path: 
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/PersistedStatesTest.java
##########
@@ -96,6 +97,24 @@ public void bindPersistedAppendingBuffer() {
     assertThat(state.boundNames, hasItems("buffer"));
   }
 
+  @Test
+  public void bindDynamicState() {

Review comment:
       👍 that is a super doper great test

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.Expiration;
+import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public final class PersistedRemoteFunctionValues {
+
+  @Persisted private final PersistedStateRegistry stateRegistry = new 
PersistedStateRegistry();
+
+  private final Map<String, PersistedValue<byte[]>> managedStates;
+
+  public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) {
+    Objects.requireNonNull(stateSpecs);
+    this.managedStates = new HashMap<>(stateSpecs.size());
+    stateSpecs.forEach(spec -> managedStates.put(spec.name(), 
createStateHandle(spec)));
+  }
+
+  void forEach(BiConsumer<String, byte[]> consumer) {

Review comment:
       since this is very tied to the `ToFunction` aspect of the code, can we 
pass in the `ToFunction` builder to avoid a allocation for the closure?

##########
File path: 
statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import org.apache.flink.statefun.sdk.FunctionType;
+
+public abstract class StateBinder {
+  public abstract void bindValue(PersistedValue<?> persistedValue, 
FunctionType functionType);
+
+  public abstract void bindTable(PersistedTable<?, ?> persistedTable, 
FunctionType functionType);
+
+  public abstract void bindAppendingBuffer(
+      PersistedAppendingBuffer<?> persistedAppendingBuffer, FunctionType 
functionType);
+
+  public final void bind(Object stateObject, FunctionType functionType) {

Review comment:
       I think that this logic doesn't belong to the SDK, and it should be part 
of the runtime, 
   and this should be a simple interface.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
##########
@@ -26,14 +26,31 @@
 import java.util.stream.Stream;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
-final class PersistedStates {
+public final class PersistedStates {

Review comment:
       I think that this class should reflect the fact that it reflectively 
looks for states.
   i.e. `ReflectiveStateBinder`.
   
   

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
##########
@@ -39,28 +37,22 @@ public StateBinder(@Label("state") State state) {
     this.state = Objects.requireNonNull(state);
   }
 
-  public void bind(FunctionType functionType, @Nullable Object instance) {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to