shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492637135


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##########
@@ -598,13 +606,81 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT> 
bindMultimap(
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T> OrderedListState<T> bindOrderedList(
       String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException(
-        "TODO: Add support for a sorted-list state to the Fn API.");
+    return (OrderedListState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createOrderedListUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new OrderedListState<T>() {
+                  private final OrderedListUserState<T> impl =
+                      createOrderedListUserState(key, elemCoder);
+
+                  @Override
+                  public void clear() {
+                    clearRange(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+                  }
+
+                  @Override
+                  public void add(TimestampedValue<T> value) {
+                    impl.add(value);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !impl.read().iterator().hasNext();
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Nullable
+                  @Override
+                  public Iterable<TimestampedValue<T>> read() {
+                    return readRange(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+                  }
+
+                  @Override
+                  public GroupingState<TimestampedValue<T>, 
Iterable<TimestampedValue<T>>>
+                      readLater() {
+                    throw new UnsupportedOperationException();

Review Comment:
   Ok. Will revise it accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to