scwhittle commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1491549772


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   comment on continuation token, ie should be returned by previous response 
and the key/range should match the previous request generating the token



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##########
@@ -598,13 +606,81 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT> 
bindMultimap(
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T> OrderedListState<T> bindOrderedList(
       String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException(
-        "TODO: Add support for a sorted-list state to the Fn API.");
+    return (OrderedListState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createOrderedListUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new OrderedListState<T>() {
+                  private final OrderedListUserState<T> impl =
+                      createOrderedListUserState(key, elemCoder);
+
+                  @Override
+                  public void clear() {
+                    clearRange(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+                  }
+
+                  @Override
+                  public void add(TimestampedValue<T> value) {
+                    impl.add(value);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !impl.read().iterator().hasNext();
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Nullable
+                  @Override
+                  public Iterable<TimestampedValue<T>> read() {
+                    return readRange(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+                  }
+
+                  @Override
+                  public GroupingState<TimestampedValue<T>, 
Iterable<TimestampedValue<T>>>
+                      readLater() {
+                    throw new UnsupportedOperationException();

Review Comment:
   just doing nothing is valid here and seems better than an exception, ditto 
below



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   how do we return multiple elements if the request was a range?
   
   should we return the sort-key for elements, that seems like part of the 
user-data for example if it's some id/timestamp the user might want it back 
instead of having to duplicate it in the payload as well.
   
   should this be repeated OrderedListEntry instead of data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to