lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r688068062



##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,15 +731,39 @@ message StateKey {
     bytes key = 4;
   }
 
+  // Represents a request for the keys of a multimap associated with a 
specified
+  // user key and window in a PCollection. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // Can only be used to perform StateGetRequests and StateClearRequests on the
+  // user state.
+  //
+  // For a PCollection<KV<K, V>>, the response data stream will be a
+  // concatenation of all K's associated with the specified user key and 
window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
   message MultimapKeysUserState {
     // (Required) The id of the PTransform containing user state.
     string transform_id = 1;
     // (Required) The id of the user state.
     string user_state_id = 2;
     // (Required) The window encoded in a nested context.
     bytes window = 3;
+    // (Required) The key of the currently executing element encoded in a
+    // nested context.
+    bytes key = 4;
   }
 
+  // Represents a request for the values of the lookup key associated with a
+  // specified user key and window in a PCollection. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further

Review comment:
       Ditto for not being a PCollection but data that is associated with the 
transform itself.
   
   See https://beam.apache.org/blog/stateful-processing/

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -740,6 +774,8 @@ message StateKey {
     // (Required) The key of the currently executing element encoded in a
     // nested context.
     bytes key = 4;
+    // (Required) The encoded look up key for the map.
+    bytes map_key = 5;

Review comment:
       ```suggestion
       // (Required) The encoded lookup key for the map encoded in a nested 
context.
       bytes map_key = 5;
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {

Review comment:
       The WindmillStateInternals does add it to the windmillmap as a side 
effect of the construction of the returned ReadableState 
[here](https://github.com/apache/beam/blob/bd9e4d3f63d06f8156203173ab643530ea16ee52/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java#L1154)
 so it doesn't require the user to call read to have it added. Calling read 
just tells the user whether it was added or not.

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2172,22 +2170,6 @@ static void verifyDoFnSupported(DoFn<?, ?> fn, boolean 
streaming, boolean stream
               "%s does not currently support @RequiresTimeSortedInput in 
streaming mode.",
               DataflowRunner.class.getSimpleName()));
     }
-    if (DoFnSignatures.usesSetState(fn)) {

Review comment:
       Yes unless you plan to add support to the current production Java worker.

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2236,13 +2238,33 @@ static boolean 
useStreamingEngine(DataflowPipelineOptions options) {
         || hasExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
   }
 
-  static void verifyDoFnSupported(DoFn<?, ?> fn, boolean streaming, boolean 
streamingEngine) {
+  static void verifyDoFnSupported(
+      DoFn<?, ?> fn, boolean streaming, DataflowPipelineOptions options) {
     if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
       throw new UnsupportedOperationException(
           String.format(
               "%s does not currently support @RequiresTimeSortedInput in 
streaming mode.",
               DataflowRunner.class.getSimpleName()));
     }
+    if (!useUnifiedWorker(options)) {

Review comment:
       Just making sure that UW will support map state and set state regardless 
if windmill appliance or streaming engine is used as well based upon the code 
your developing within UW.

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }

Review comment:
       lgtm

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }
 
+  message MultimapKeysUserState {

Review comment:
       User state isn't stored in a PCollection, it is state associated with 
the transform itself keyed by the currently processing window and key which is 
how it differs from side inputs since side inputs.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());

Review comment:
       No, to the runner it is just a byte[], for void its 0 length and for 
boolean its 1 length so its a very minor perf optimization but for very large 
sets it could matter a little.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to