shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918
########## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ########## @@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: + { + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { + try { + // The continuation format here is the sort key (long) followed by an index (int) + KV<Long, Integer> cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { + if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); + } + + NavigableSet<Long> subset = + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(sortKey, true, end, false); + + // get the effective sort key currently, can throw NoSuchElementException + Long nextSortKey = subset.first(); + + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); + List<ByteString> byteStrings = + data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + + // get the block specified in continuation token, can throw IndexOutOfBoundsException + returnBlock = byteStrings.get(index); + + if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; + } else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; + } + + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + KV<Long, Integer> cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { + continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); + } + break; + + case ORDERED_LIST_UPDATE: + for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List<Long> keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); + data.remove(keyBuilder.build()); + orderedListKeys.get(request.getStateKey()).remove(l); + } + } + + for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { + InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse TimestampedValueCoder but also implement a special coder for ByteString that doesn't append string length during the encoding phase. Then I can use `TimestampedValueCoder.of(NewCoder)` to encode TimestampedValue, like https://github.com/apache/beam/blob/33e748d175079c68f4fc2935a396cb10a4582637/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L341 This can both avoid the decoding overhead in the fake client, but it also leverage TimestampedValueCoder without explicitly documenting the expected buffer layout in the comment like I did before https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L319 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org