acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1635154541
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -143,34 +178,162 @@ public CompletableFuture<StateResponse>
handle(StateRequest.Builder requestBuild
switch (request.getRequestCase()) {
case GET:
- List<ByteString> byteStrings =
- data.getOrDefault(request.getStateKey(),
Collections.singletonList(ByteString.EMPTY));
- int block = 0;
- if (request.getGet().getContinuationToken().size() > 0) {
- block =
Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
- }
- ByteString returnBlock = byteStrings.get(block);
- ByteString continuationToken = ByteString.EMPTY;
- if (byteStrings.size() > block + 1) {
- continuationToken = ByteString.copyFromUtf8(Integer.toString(block +
1));
+ if (key.getTypeCase() != TypeCase.ORDERED_LIST_USER_STATE) {
+ List<ByteString> byteStrings =
+ data.getOrDefault(request.getStateKey(),
Collections.singletonList(ByteString.EMPTY));
+ int block = 0;
+ if (!request.getGet().getContinuationToken().isEmpty()) {
+ block =
Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
+ }
+ ByteString returnBlock = byteStrings.get(block);
+ ByteString continuationToken = ByteString.EMPTY;
+ if (byteStrings.size() > block + 1) {
+ continuationToken = ByteString.copyFromUtf8(Integer.toString(block
+ 1));
+ }
+ response =
+ StateResponse.newBuilder()
+ .setGet(
+ StateGetResponse.newBuilder()
+ .setData(returnBlock)
+ .setContinuationToken(continuationToken));
+ } else {
+ long start = key.getOrderedListUserState().getRange().getStart();
+ long end = key.getOrderedListUserState().getRange().getEnd();
+
+ KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(),
VarIntCoder.of());
+ long sortKey = start;
+ int index = 0;
+ if (!request.getGet().getContinuationToken().isEmpty()) {
+ try {
+ // The continuation format here is the sort key (long) followed
by an index (int)
+ KV<Long, Integer> cursor =
+
coder.decode(request.getGet().getContinuationToken().newInput());
+ sortKey = cursor.getKey();
+ index = cursor.getValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ ByteString continuationToken;
+ ByteString returnBlock = ByteString.EMPTY;
+ try {
+ if (sortKey < start || sortKey >= end) {
+ throw new IndexOutOfBoundsException("sort key out of range");
+ }
+
+ StateKey.Builder stateKeyWithoutRange =
request.getStateKey().toBuilder();
+ stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange();
+ NavigableSet<Long> subset =
+ orderedListSortKeysFromStateKey
+ .getOrDefault(stateKeyWithoutRange.build(), new
TreeSet<>())
+ .subSet(sortKey, true, end, false);
+
+ // get the effective sort key currently, can throw
NoSuchElementException
+ Long nextSortKey = subset.first();
+
+ StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+ keyBuilder
+ .getOrderedListUserStateBuilder()
+ .getRangeBuilder()
+ .setStart(nextSortKey)
+ .setEnd(nextSortKey + 1);
+ List<ByteString> byteStrings =
+ data.getOrDefault(keyBuilder.build(),
Collections.singletonList(ByteString.EMPTY));
+
+ // get the block specified in continuation token, can throw
IndexOutOfBoundsException
+ returnBlock = byteStrings.get(index);
+
+ if (byteStrings.size() > index + 1) {
+ // more blocks from this sort key
+ index += 1;
+ } else {
+ // finish navigating the current sort key and need to find the
next one,
+ // can throw NoSuchElementException
+ nextSortKey = subset.tailSet(nextSortKey, false).first();
+ index = 0;
+ }
+
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
+ try {
+ KV<Long, Integer> cursor = KV.of(nextSortKey, index);
+ coder.encode(cursor, outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ continuationToken = outputStream.toByteString();
+ } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+ continuationToken = ByteString.EMPTY;
+ }
+ response =
+ StateResponse.newBuilder()
+ .setGet(
+ StateGetResponse.newBuilder()
+ .setData(returnBlock)
+ .setContinuationToken(continuationToken));
}
- response =
- StateResponse.newBuilder()
- .setGet(
- StateGetResponse.newBuilder()
- .setData(returnBlock)
- .setContinuationToken(continuationToken));
break;
case CLEAR:
- data.remove(request.getStateKey());
+ if (key.getTypeCase() != TypeCase.ORDERED_LIST_USER_STATE) {
Review Comment:
Same with these if/else blocks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]