kileys commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r688165198



##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }
 
+  message MultimapKeysUserState {

Review comment:
       Fixed

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {

Review comment:
       I see that `putIfAbsent` calls 
[computeIfAbsent](https://github.com/apache/beam/blob/bd9e4d3f63d06f8156203173ab643530ea16ee52/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java#L1514)
 which doesn't insert into the map until read is called on the ReadableState. 
Am I looking at this incorrectly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to