shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1635632473
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##########
@@ -600,8 +603,73 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT>
bindMultimap(
@Override
public <T> OrderedListState<T> bindOrderedList(
String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- "TODO: Add support for a sorted-list state to the Fn API.");
+ return (OrderedListState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createOrderedListUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new OrderedListState<T>() {
+ private final OrderedListUserState<T> impl =
+ createOrderedListUserState(key, elemCoder);
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public void add(TimestampedValue<T> value) {
+ impl.add(value);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return !impl.read().iterator().hasNext();
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public Iterable<TimestampedValue<T>> read() {
+ return readRange(
+ Instant.ofEpochMilli(Long.MIN_VALUE),
Instant.ofEpochMilli(Long.MAX_VALUE));
Review Comment:
Right. As I mentioned to @acrites earlier, we leverage the fact that Joda
Instants are just wrapper around int64(aka long in Java). The internal sort key
is int64, so the min and max boundary should also be the min and max values of
int64.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]