shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1635632473


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##########
@@ -600,8 +603,73 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT> 
bindMultimap(
   @Override
   public <T> OrderedListState<T> bindOrderedList(
       String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException(
-        "TODO: Add support for a sorted-list state to the Fn API.");
+    return (OrderedListState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createOrderedListUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new OrderedListState<T>() {
+                  private final OrderedListUserState<T> impl =
+                      createOrderedListUserState(key, elemCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void add(TimestampedValue<T> value) {
+                    impl.add(value);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !impl.read().iterator().hasNext();
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Nullable
+                  @Override
+                  public Iterable<TimestampedValue<T>> read() {
+                    return readRange(
+                        Instant.ofEpochMilli(Long.MIN_VALUE), 
Instant.ofEpochMilli(Long.MAX_VALUE));

Review Comment:
   Right. As I mentioned to @acrites earlier, we leverage the fact that Joda 
Instants are just wrapper around int64(aka long in Java). The internal sort key 
is int64, so the min and max boundary should also be the min and max values of 
int64.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to