lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r681976394



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        if (!contains(t).read()) {
+                          add(t);
+                          return true;
+                        }
+                        return false;
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);

Review comment:
       addIfAbsent().read()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to